This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 42af41d MINOR: Caching layer should forward record timestamp (#5423)
42af41d is described below
commit 42af41d5fc991c392b75396352903b4d919da5d3
Author: Matthias J. Sax <[email protected]>
AuthorDate: Thu Jul 26 09:31:02 2018 -0700
MINOR: Caching layer should forward record timestamp (#5423)
Reviewer: Guozhang Wang <[email protected]>
---
.../state/internals/CachingSessionStore.java | 2 +-
.../state/internals/CachingWindowStore.java | 2 +-
.../KStreamAggregationIntegrationTest.java | 274 +++++++++------------
.../integration/utils/IntegrationTestUtils.java | 118 ++++++---
4 files changed, 202 insertions(+), 194 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 6950693..c307f6d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -147,7 +147,7 @@ class CachingSessionStore<K, AGG> extends
WrappedStateStore.AbstractStateStore i
context.headers(),
true,
context.offset(),
- key.window().end(),
+ context.timestamp(),
context.partition(),
context.topic());
cache.put(cacheName, cacheFunction.cacheKey(binaryKey), entry);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 1f08f51..07120df 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -156,7 +156,7 @@ class CachingWindowStore<K, V> extends
WrappedStateStore.AbstractStateStore impl
context.headers(),
true,
context.offset(),
- timestamp,
+ context.timestamp(),
context.partition(),
context.topic());
cache.put(name, cacheFunction.cacheKey(keyBytes), entry);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 10363f8..a29332c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -28,7 +28,6 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
@@ -36,7 +35,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
@@ -49,16 +48,16 @@ import
org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
-import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestUtils;
@@ -136,24 +135,9 @@ public class KStreamAggregationIntegrationTest {
mapper,
Serialized.with(Serdes.String(), Serdes.String()));
- reducer = new Reducer<String>() {
- @Override
- public String apply(final String value1, final String value2) {
- return value1 + ":" + value2;
- }
- };
- initializer = new Initializer<Integer>() {
- @Override
- public Integer apply() {
- return 0;
- }
- };
- aggregator = new Aggregator<String, String, Integer>() {
- @Override
- public Integer apply(final String aggKey, final String value,
final Integer aggregate) {
- return aggregate + value.length();
- }
- };
+ reducer = (value1, value2) -> value1 + ":" + value2;
+ initializer = () -> 0;
+ aggregator = (aggKey, value, aggregate) -> aggregate + value.length();
}
@After
@@ -181,12 +165,7 @@ public class KStreamAggregationIntegrationTest {
new StringDeserializer(),
10);
- Collections.sort(results, new Comparator<KeyValue<String, String>>() {
- @Override
- public int compare(final KeyValue<String, String> o1, final
KeyValue<String, String> o2) {
- return KStreamAggregationIntegrationTest.compare(o1, o2);
- }
- });
+ Collections.sort(results, KStreamAggregationIntegrationTest::compare);
assertThat(results, is(Arrays.asList(KeyValue.pair("A", "A"),
KeyValue.pair("A", "A:A"),
@@ -218,7 +197,7 @@ public class KStreamAggregationIntegrationTest {
produceMessages(secondBatchTimestamp);
produceMessages(secondBatchTimestamp);
- Serde<Windowed<String>> windowedSerde =
WindowedSerdes.timeWindowedSerdeFrom(String.class);
+ final Serde<Windowed<String>> windowedSerde =
WindowedSerdes.timeWindowedSerdeFrom(String.class);
groupedStream
.windowedBy(TimeWindows.of(500L))
.reduce(reducer)
@@ -228,34 +207,28 @@ public class KStreamAggregationIntegrationTest {
startStreams();
final List<KeyValue<Windowed<String>, String>> windowedOutput =
receiveMessages(
- new TimeWindowedDeserializer<String>(),
+ new TimeWindowedDeserializer<>(),
new StringDeserializer(),
String.class,
15);
// read from ConsoleConsumer
- String resultFromConsoleConsumer =
readWindowedKeyedMessagesViaConsoleConsumer(
- new TimeWindowedDeserializer<String>(),
- new StringDeserializer(),
- String.class,
- 15);
+ final String resultFromConsoleConsumer =
readWindowedKeyedMessagesViaConsoleConsumer(
+ new TimeWindowedDeserializer<String>(),
+ new StringDeserializer(),
+ String.class,
+ 15,
+ false);
final Comparator<KeyValue<Windowed<String>, String>>
comparator =
- new Comparator<KeyValue<Windowed<String>, String>>() {
- @Override
- public int compare(final KeyValue<Windowed<String>, String> o1,
- final KeyValue<Windowed<String>, String>
o2) {
- final int keyComparison =
o1.key.key().compareTo(o2.key.key());
- return keyComparison == 0 ? o1.value.compareTo(o2.value) :
keyComparison;
- }
- };
+ Comparator.comparing((KeyValue<Windowed<String>, String> o) ->
o.key.key()).thenComparing(o -> o.value);
Collections.sort(windowedOutput, comparator);
final long firstBatchWindow = firstBatchTimestamp / 500 * 500;
final long secondBatchWindow = secondBatchTimestamp / 500 * 500;
- List<KeyValue<Windowed<String>, String>> expectResult = Arrays.asList(
+ final List<KeyValue<Windowed<String>, String>> expectResult =
Arrays.asList(
new KeyValue<>(new Windowed<>("A", new
TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "A"),
new KeyValue<>(new Windowed<>("A", new
TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A"),
new KeyValue<>(new Windowed<>("A", new
TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A:A"),
@@ -274,13 +247,13 @@ public class KStreamAggregationIntegrationTest {
);
assertThat(windowedOutput, is(expectResult));
- Set<String> expectResultString = new HashSet<>(expectResult.size());
- for (KeyValue<Windowed<String>, String> eachRecord: expectResult) {
+ final Set<String> expectResultString = new
HashSet<>(expectResult.size());
+ for (final KeyValue<Windowed<String>, String> eachRecord:
expectResult) {
expectResultString.add(eachRecord.toString());
}
// check every message is contained in the expect result
- String[] allRecords = resultFromConsoleConsumer.split("\n");
+ final String[] allRecords = resultFromConsoleConsumer.split("\n");
for (String record: allRecords) {
record = "KeyValue(" + record + ")";
assertTrue(expectResultString.contains(record));
@@ -306,12 +279,7 @@ public class KStreamAggregationIntegrationTest {
new IntegerDeserializer(),
10);
- Collections.sort(results, new Comparator<KeyValue<String, Integer>>() {
- @Override
- public int compare(final KeyValue<String, Integer> o1, final
KeyValue<String, Integer> o2) {
- return KStreamAggregationIntegrationTest.compare(o1, o2);
- }
- });
+ Collections.sort(results, KStreamAggregationIntegrationTest::compare);
assertThat(results, is(Arrays.asList(
KeyValue.pair("A", 1),
@@ -336,75 +304,68 @@ public class KStreamAggregationIntegrationTest {
produceMessages(secondTimestamp);
produceMessages(secondTimestamp);
- Serde<Windowed<String>> windowedSerde =
WindowedSerdes.timeWindowedSerdeFrom(String.class);
+ final Serde<Windowed<String>> windowedSerde =
WindowedSerdes.timeWindowedSerdeFrom(String.class);
groupedStream.windowedBy(TimeWindows.of(500L))
.aggregate(
initializer,
aggregator,
- Materialized.<String, Integer, WindowStore<Bytes,
byte[]>>with(null, Serdes.Integer())
+ Materialized.with(null, Serdes.Integer())
)
.toStream()
.to(outputTopic, Produced.with(windowedSerde,
Serdes.Integer()));
startStreams();
- final List<KeyValue<Windowed<String>, Integer>> windowedMessages =
receiveMessages(
- new TimeWindowedDeserializer<String>(),
+ final List<KeyValue<Windowed<String>, KeyValue<Integer, Long>>>
windowedMessages = receiveMessagesWithTimestamp(
+ new TimeWindowedDeserializer<>(),
new IntegerDeserializer(),
String.class,
15);
// read from ConsoleConsumer
- String resultFromConsoleConsumer =
readWindowedKeyedMessagesViaConsoleConsumer(
- new TimeWindowedDeserializer<String>(),
- new IntegerDeserializer(),
- String.class,
- 15);
+ final String resultFromConsoleConsumer =
readWindowedKeyedMessagesViaConsoleConsumer(
+ new TimeWindowedDeserializer<String>(),
+ new IntegerDeserializer(),
+ String.class,
+ 15,
+ true);
- final Comparator<KeyValue<Windowed<String>, Integer>>
+ final Comparator<KeyValue<Windowed<String>, KeyValue<Integer, Long>>>
comparator =
- new Comparator<KeyValue<Windowed<String>, Integer>>() {
- @Override
- public int compare(final KeyValue<Windowed<String>, Integer>
o1,
- final KeyValue<Windowed<String>, Integer>
o2) {
- final int keyComparison =
o1.key.key().compareTo(o2.key.key());
- return keyComparison == 0 ? o1.value.compareTo(o2.value) :
keyComparison;
- }
- };
+ Comparator.comparing((KeyValue<Windowed<String>, KeyValue<Integer,
Long>> o) -> o.key.key()).thenComparingInt(o -> o.value.key);
Collections.sort(windowedMessages, comparator);
final long firstWindow = firstTimestamp / 500 * 500;
final long secondWindow = secondTimestamp / 500 * 500;
- List<KeyValue<Windowed<String>, Integer>> expectResult = Arrays.asList(
- new KeyValue<>(new Windowed<>("A", new TimeWindow(firstWindow,
Long.MAX_VALUE)), 1),
- new KeyValue<>(new Windowed<>("A", new
TimeWindow(secondWindow, Long.MAX_VALUE)), 1),
- new KeyValue<>(new Windowed<>("A", new
TimeWindow(secondWindow, Long.MAX_VALUE)), 2),
- new KeyValue<>(new Windowed<>("B", new TimeWindow(firstWindow,
Long.MAX_VALUE)), 1),
- new KeyValue<>(new Windowed<>("B", new
TimeWindow(secondWindow, Long.MAX_VALUE)), 1),
- new KeyValue<>(new Windowed<>("B", new
TimeWindow(secondWindow, Long.MAX_VALUE)), 2),
- new KeyValue<>(new Windowed<>("C", new TimeWindow(firstWindow,
Long.MAX_VALUE)), 1),
- new KeyValue<>(new Windowed<>("C", new
TimeWindow(secondWindow, Long.MAX_VALUE)), 1),
- new KeyValue<>(new Windowed<>("C", new
TimeWindow(secondWindow, Long.MAX_VALUE)), 2),
- new KeyValue<>(new Windowed<>("D", new TimeWindow(firstWindow,
Long.MAX_VALUE)), 1),
- new KeyValue<>(new Windowed<>("D", new
TimeWindow(secondWindow, Long.MAX_VALUE)), 1),
- new KeyValue<>(new Windowed<>("D", new
TimeWindow(secondWindow, Long.MAX_VALUE)), 2),
- new KeyValue<>(new Windowed<>("E", new TimeWindow(firstWindow,
Long.MAX_VALUE)), 1),
- new KeyValue<>(new Windowed<>("E", new
TimeWindow(secondWindow, Long.MAX_VALUE)), 1),
- new KeyValue<>(new Windowed<>("E", new
TimeWindow(secondWindow, Long.MAX_VALUE)), 2));
+ final List<KeyValue<Windowed<String>, KeyValue<Integer, Long>>>
expectResult = Arrays.asList(
+ new KeyValue<>(new Windowed<>("A", new TimeWindow(firstWindow,
Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)),
+ new KeyValue<>(new Windowed<>("A", new
TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)),
+ new KeyValue<>(new Windowed<>("A", new
TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)),
+ new KeyValue<>(new Windowed<>("B", new TimeWindow(firstWindow,
Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)),
+ new KeyValue<>(new Windowed<>("B", new
TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)),
+ new KeyValue<>(new Windowed<>("B", new
TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)),
+ new KeyValue<>(new Windowed<>("C", new TimeWindow(firstWindow,
Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)),
+ new KeyValue<>(new Windowed<>("C", new
TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)),
+ new KeyValue<>(new Windowed<>("C", new
TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)),
+ new KeyValue<>(new Windowed<>("D", new TimeWindow(firstWindow,
Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)),
+ new KeyValue<>(new Windowed<>("D", new
TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)),
+ new KeyValue<>(new Windowed<>("D", new
TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)),
+ new KeyValue<>(new Windowed<>("E", new TimeWindow(firstWindow,
Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)),
+ new KeyValue<>(new Windowed<>("E", new
TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)),
+ new KeyValue<>(new Windowed<>("E", new
TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)));
assertThat(windowedMessages, is(expectResult));
- Set<String> expectResultString = new HashSet<>(expectResult.size());
- for (KeyValue<Windowed<String>, Integer> eachRecord: expectResult) {
- expectResultString.add(eachRecord.toString());
+ final Set<String> expectResultString = new
HashSet<>(expectResult.size());
+ for (final KeyValue<Windowed<String>, KeyValue<Integer, Long>>
eachRecord: expectResult) {
+ expectResultString.add("CreateTime:" + eachRecord.value.value + ",
" + eachRecord.key.toString() + ", " + eachRecord.value.key);
}
// check every message is contained in the expect result
- String[] allRecords = resultFromConsoleConsumer.split("\n");
- for (String record: allRecords) {
- record = "KeyValue(" + record + ")";
+ final String[] allRecords = resultFromConsoleConsumer.split("\n");
+ for (final String record: allRecords) {
assertTrue(expectResultString.contains(record));
}
@@ -419,12 +380,7 @@ public class KStreamAggregationIntegrationTest {
new StringDeserializer(),
new LongDeserializer(),
10);
- Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
- @Override
- public int compare(final KeyValue<String, Long> o1, final
KeyValue<String, Long> o2) {
- return KStreamAggregationIntegrationTest.compare(o1, o2);
- }
- });
+ Collections.sort(results, KStreamAggregationIntegrationTest::compare);
assertThat(results, is(Arrays.asList(
KeyValue.pair("A", 1L),
@@ -444,7 +400,7 @@ public class KStreamAggregationIntegrationTest {
public void shouldCount() throws Exception {
produceMessages(mockTime.milliseconds());
- groupedStream.count(Materialized.<String, Long, KeyValueStore<Bytes,
byte[]>>as("count-by-key"))
+ groupedStream.count(Materialized.as("count-by-key"))
.toStream()
.to(outputTopic, Produced.with(Serdes.String(),
Serdes.Long()));
@@ -471,12 +427,7 @@ public class KStreamAggregationIntegrationTest {
stream.groupByKey(Serialized.with(Serdes.Integer(), Serdes.String()))
.windowedBy(TimeWindows.of(500L))
.count()
- .toStream(new KeyValueMapper<Windowed<Integer>, Long,
String>() {
- @Override
- public String apply(final Windowed<Integer> windowedKey,
final Long value) {
- return windowedKey.key() + "@" +
windowedKey.window().start();
- }
- }).to(outputTopic, Produced.with(Serdes.String(),
Serdes.Long()));
+ .toStream((windowedKey, value) -> windowedKey.key() + "@" +
windowedKey.window().start()).to(outputTopic, Produced.with(Serdes.String(),
Serdes.Long()));
startStreams();
@@ -484,12 +435,7 @@ public class KStreamAggregationIntegrationTest {
new StringDeserializer(),
new LongDeserializer(),
10);
- Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
- @Override
- public int compare(final KeyValue<String, Long> o1, final
KeyValue<String, Long> o2) {
- return KStreamAggregationIntegrationTest.compare(o1, o2);
- }
- });
+ Collections.sort(results, KStreamAggregationIntegrationTest::compare);
final long window = timestamp / 500 * 500;
assertThat(results, is(Arrays.asList(
@@ -568,7 +514,7 @@ public class KStreamAggregationIntegrationTest {
new Properties()),
t4);
- final Map<Windowed<String>, Long> results = new HashMap<>();
+ final Map<Windowed<String>, KeyValue<Long, Long>> results = new
HashMap<>();
final CountDownLatch latch = new CountDownLatch(11);
builder.stream(userSessionsStream, Consumed.with(Serdes.String(),
Serdes.String()))
@@ -576,23 +522,34 @@ public class KStreamAggregationIntegrationTest {
.windowedBy(SessionWindows.with(sessionGap).until(maintainMillis))
.count()
.toStream()
- .foreach(new ForeachAction<Windowed<String>, Long>() {
- @Override
- public void apply(final Windowed<String> key, final Long
value) {
- results.put(key, value);
- latch.countDown();
- }
- });
+ .transform(() -> new Transformer<Windowed<String>, Long,
KeyValue<Object, Object>>() {
+ private ProcessorContext context;
+
+ @Override
+ public void init(final ProcessorContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public KeyValue<Object, Object> transform(final
Windowed<String> key, final Long value) {
+ results.put(key, KeyValue.pair(value,
context.timestamp()));
+ latch.countDown();
+ return null;
+ }
+
+ @Override
+ public void close() {}
+ });
startStreams();
latch.await(30, TimeUnit.SECONDS);
- assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1,
t1))), equalTo(1L));
- assertThat(results.get(new Windowed<>("penny", new SessionWindow(t1,
t1))), equalTo(1L));
- assertThat(results.get(new Windowed<>("jo", new SessionWindow(t1,
t1))), equalTo(1L));
- assertThat(results.get(new Windowed<>("jo", new SessionWindow(t4,
t4))), equalTo(1L));
- assertThat(results.get(new Windowed<>("emily", new SessionWindow(t1,
t2))), equalTo(2L));
- assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3,
t4))), equalTo(2L));
- assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3,
t3))), equalTo(1L));
+ assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1,
t1))), equalTo(KeyValue.pair(1L, t1)));
+ assertThat(results.get(new Windowed<>("penny", new SessionWindow(t1,
t1))), equalTo(KeyValue.pair(1L, t1)));
+ assertThat(results.get(new Windowed<>("jo", new SessionWindow(t1,
t1))), equalTo(KeyValue.pair(1L, t1)));
+ assertThat(results.get(new Windowed<>("jo", new SessionWindow(t4,
t4))), equalTo(KeyValue.pair(1L, t4)));
+ assertThat(results.get(new Windowed<>("emily", new SessionWindow(t1,
t2))), equalTo(KeyValue.pair(2L, t2)));
+ assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3,
t4))), equalTo(KeyValue.pair(2L, t4)));
+ assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3,
t3))), equalTo(KeyValue.pair(1L, t3)));
}
@Test
@@ -662,25 +619,17 @@ public class KStreamAggregationIntegrationTest {
builder.stream(userSessionsStream, Consumed.with(Serdes.String(),
Serdes.String()))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(SessionWindows.with(sessionGap).until(maintainMillis))
- .reduce(new Reducer<String>() {
- @Override
- public String apply(final String value1, final String
value2) {
- return value1 + ":" + value2;
- }
- }, Materialized.<String, String, SessionStore<Bytes,
byte[]>>as(userSessionsStore))
+ .reduce((value1, value2) -> value1 + ":" + value2,
Materialized.as(userSessionsStore))
.toStream()
- .foreach(new ForeachAction<Windowed<String>, String>() {
- @Override
- public void apply(final Windowed<String> key, final String
value) {
- results.put(key, value);
- latch.countDown();
- }
+ .foreach((key, value) -> {
+ results.put(key, value);
+ latch.countDown();
});
startStreams();
latch.await(30, TimeUnit.SECONDS);
final ReadOnlySessionStore<String, String> sessionStore
- = kafkaStreams.store(userSessionsStore,
QueryableStoreTypes.<String, String>sessionStore());
+ = kafkaStreams.store(userSessionsStore,
QueryableStoreTypes.sessionStore());
// verify correct data received
assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1,
t1))), equalTo("start"));
@@ -732,16 +681,14 @@ public class KStreamAggregationIntegrationTest {
}
private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
keyDeserializer,
- final Deserializer<V>
valueDeserializer,
- final int numMessages)
+ final
Deserializer<V> valueDeserializer,
+ final
int numMessages)
throws InterruptedException {
return receiveMessages(keyDeserializer, valueDeserializer, null,
numMessages);
}
- private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
-
keyDeserializer,
- final Deserializer<V>
-
valueDeserializer,
+ private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
keyDeserializer,
+ final Deserializer<V>
valueDeserializer,
final Class innerClass,
final int numMessages)
throws InterruptedException {
final Properties consumerProperties = new Properties();
@@ -761,21 +708,44 @@ public class KStreamAggregationIntegrationTest {
60 * 1000);
}
+ private <K, V> List<KeyValue<K, KeyValue<V, Long>>>
receiveMessagesWithTimestamp(final Deserializer<K> keyDeserializer,
+
final Deserializer<V> valueDeserializer,
+
final Class innerClass,
+
final int numMessages) throws InterruptedException {
+ final Properties consumerProperties = new Properties();
+
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
"kgroupedstream-test-" + testNo);
+
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
+
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
keyDeserializer.getClass().getName());
+
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
valueDeserializer.getClass().getName());
+ if (keyDeserializer instanceof TimeWindowedDeserializer ||
keyDeserializer instanceof SessionWindowedDeserializer) {
+
consumerProperties.setProperty(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS,
+ Serdes.serdeFrom(innerClass).getClass().getName());
+ }
+ return
IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
+ consumerProperties,
+ outputTopic,
+ numMessages,
+ 60 * 1000);
+ }
+
private <K, V> String readWindowedKeyedMessagesViaConsoleConsumer(final
Deserializer<K> keyDeserializer,
final
Deserializer<V> valueDeserializer,
final
Class innerClass,
- final
int numMessages) {
- ByteArrayOutputStream newConsole = new ByteArrayOutputStream();
- PrintStream originalStream = System.out;
- try (PrintStream newStream = new PrintStream(newConsole)) {
+ final
int numMessages,
+ final
boolean printTimestamp) {
+ final ByteArrayOutputStream newConsole = new ByteArrayOutputStream();
+ final PrintStream originalStream = System.out;
+ try (final PrintStream newStream = new PrintStream(newConsole)) {
System.setOut(newStream);
- String keySeparator = ", ";
+ final String keySeparator = ", ";
// manually construct the console consumer argument array
- String[] args = new String[] {
+ final String[] args = new String[] {
"--bootstrap-server", CLUSTER.bootstrapServers(),
"--from-beginning",
"--property", "print.key=true",
+ "--property", "print.timestamp=" + printTimestamp,
"--topic", outputTopic,
"--max-messages", String.valueOf(numMessages),
"--property", "key.deserializer=" +
keyDeserializer.getClass().getName(),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index a0b8f3d..749d748 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -192,7 +192,7 @@ public class IntegrationTestUtils {
final
Long timestamp,
final
boolean enabledTransactions)
throws ExecutionException, InterruptedException {
- try (Producer<K, V> producer = new KafkaProducer<>(producerConfig)) {
+ try (final Producer<K, V> producer = new
KafkaProducer<>(producerConfig)) {
if (enabledTransactions) {
producer.initTransactions();
producer.beginTransaction();
@@ -310,14 +310,39 @@ public class IntegrationTestUtils {
final long waitTime) throws InterruptedException {
final List<KeyValue<K, V>> accumData = new ArrayList<>();
try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
- final TestCondition valuesRead = new TestCondition() {
- @Override
- public boolean conditionMet() {
- final List<KeyValue<K, V>> readData =
- readKeyValues(topic, consumer, waitTime,
expectedNumRecords);
- accumData.addAll(readData);
- return accumData.size() >= expectedNumRecords;
- }
+ final TestCondition valuesRead = () -> {
+ final List<KeyValue<K, V>> readData =
+ readKeyValues(topic, consumer, waitTime,
expectedNumRecords);
+ accumData.addAll(readData);
+ return accumData.size() >= expectedNumRecords;
+ };
+ final String conditionDetails = "Did not receive all " +
expectedNumRecords + " records from topic " + topic;
+ TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
+ }
+ return accumData;
+ }
+
+ /**
+ * Wait until enough data (key-value records) has been consumed.
+ *
+ * @param consumerConfig Kafka Consumer configuration
+ * @param topic Topic to consume from
+ * @param expectedNumRecords Minimum number of expected records
+ * @param waitTime Upper bound in waiting time in milliseconds
+ * @return All the records consumed, or null if no records are consumed
+ * @throws AssertionError if the given wait time elapses
+ */
+ public static <K, V> List<KeyValue<K, KeyValue<V, Long>>>
waitUntilMinKeyValueWithTimestampRecordsReceived(final Properties
consumerConfig,
+
final String topic,
+
final int expectedNumRecords,
+
final long waitTime) throws
InterruptedException {
+ final List<KeyValue<K, KeyValue<V, Long>>> accumData = new
ArrayList<>();
+ try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
+ final TestCondition valuesRead = () -> {
+ final List<KeyValue<K, KeyValue<V, Long>>> readData =
+ readKeyValuesWithTimestamp(topic, consumer, waitTime,
expectedNumRecords);
+ accumData.addAll(readData);
+ return accumData.size() >= expectedNumRecords;
};
final String conditionDetails = "Did not receive all " +
expectedNumRecords + " records from topic " + topic;
TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
@@ -331,14 +356,11 @@ public class IntegrationTestUtils {
final long waitTime) throws InterruptedException {
final List<ConsumerRecord<K, V>> accumData = new ArrayList<>();
try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
- final TestCondition valuesRead = new TestCondition() {
- @Override
- public boolean conditionMet() {
- final List<ConsumerRecord<K, V>> readData =
- readRecords(topic, consumer, waitTime,
expectedNumRecords);
- accumData.addAll(readData);
- return accumData.size() >= expectedNumRecords;
- }
+ final TestCondition valuesRead = () -> {
+ final List<ConsumerRecord<K, V>> readData =
+ readRecords(topic, consumer, waitTime, expectedNumRecords);
+ accumData.addAll(readData);
+ return accumData.size() >= expectedNumRecords;
};
final String conditionDetails = "Did not receive all " +
expectedNumRecords + " records from topic " + topic;
TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
@@ -369,14 +391,11 @@ public class IntegrationTestUtils {
final long
waitTime) throws InterruptedException {
final List<V> accumData = new ArrayList<>();
try (final Consumer<Object, V> consumer =
createConsumer(consumerConfig)) {
- final TestCondition valuesRead = new TestCondition() {
- @Override
- public boolean conditionMet() {
- final List<V> readData =
- readValues(topic, consumer, waitTime,
expectedNumRecords);
- accumData.addAll(readData);
- return accumData.size() >= expectedNumRecords;
- }
+ final TestCondition valuesRead = () -> {
+ final List<V> readData =
+ readValues(topic, consumer, waitTime, expectedNumRecords);
+ accumData.addAll(readData);
+ return accumData.size() >= expectedNumRecords;
};
final String conditionDetails = "Did not receive all " +
expectedNumRecords + " records from topic " + topic;
TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
@@ -401,23 +420,20 @@ public class IntegrationTestUtils {
final String topic,
final int partition,
final long timeout)
throws InterruptedException {
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- for (final KafkaServer server : servers) {
- final MetadataCache metadataCache =
server.apis().metadataCache();
- final Option<UpdateMetadataRequest.PartitionState>
partitionInfo =
- metadataCache.getPartitionInfo(topic, partition);
- if (partitionInfo.isEmpty()) {
- return false;
- }
- final UpdateMetadataRequest.PartitionState
metadataPartitionState = partitionInfo.get();
- if
(!Request.isValidBrokerId(metadataPartitionState.basePartitionState.leader)) {
- return false;
- }
+ TestUtils.waitForCondition(() -> {
+ for (final KafkaServer server : servers) {
+ final MetadataCache metadataCache =
server.apis().metadataCache();
+ final Option<UpdateMetadataRequest.PartitionState>
partitionInfo =
+ metadataCache.getPartitionInfo(topic, partition);
+ if (partitionInfo.isEmpty()) {
+ return false;
+ }
+ final UpdateMetadataRequest.PartitionState
metadataPartitionState = partitionInfo.get();
+ if
(!Request.isValidBrokerId(metadataPartitionState.basePartitionState.leader)) {
+ return false;
}
- return true;
}
+ return true;
}, timeout, "metadata for topic=" + topic + " partition=" + partition
+ " not propagated to all brokers");
}
@@ -502,6 +518,28 @@ public class IntegrationTestUtils {
return consumedValues;
}
+ /**
+ * Returns up to `maxMessages` by reading via the provided consumer (the
topic(s) to read from
+ * are already configured in the consumer).
+ *
+ * @param topic Kafka topic to read messages from
+ * @param consumer Kafka consumer
+ * @param waitTime Maximum wait time in milliseconds
+ * @param maxMessages Maximum number of messages to read via the
consumer
+ * @return The KeyValue elements retrieved via the consumer
+ */
+ private static <K, V> List<KeyValue<K, KeyValue<V, Long>>>
readKeyValuesWithTimestamp(final String topic,
+
final Consumer<K, V> consumer,
+
final long waitTime,
+
final int maxMessages) {
+ final List<KeyValue<K, KeyValue<V, Long>>> consumedValues = new
ArrayList<>();
+ final List<ConsumerRecord<K, V>> records = readRecords(topic,
consumer, waitTime, maxMessages);
+ for (final ConsumerRecord<K, V> record : records) {
+ consumedValues.add(new KeyValue<>(record.key(),
KeyValue.pair(record.value(), record.timestamp())));
+ }
+ return consumedValues;
+ }
+
private static <K, V> List<ConsumerRecord<K, V>> readRecords(final String
topic,
final
Consumer<K, V> consumer,
final long
waitTime,