This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new 3b6e0f3 MINOR: Caching layer should forward record timestamp (#5423)
(#5426)
3b6e0f3 is described below
commit 3b6e0f3380f790e2705311a04550568ff2777c99
Author: Matthias J. Sax <[email protected]>
AuthorDate: Thu Jul 26 13:10:24 2018 -0700
MINOR: Caching layer should forward record timestamp (#5423) (#5426)
Reviewer: Guozhang Wang <[email protected]>
---
.../state/internals/CachingSessionStore.java | 2 +-
.../state/internals/CachingWindowStore.java | 2 +-
.../KStreamAggregationIntegrationTest.java | 154 ++++++++++++++-------
.../integration/utils/IntegrationTestUtils.java | 62 +++++++++
4 files changed, 165 insertions(+), 55 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 022f6f3..cb0cb25 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -143,7 +143,7 @@ class CachingSessionStore<K, AGG> extends
WrappedStateStore.AbstractStateStore i
validateStoreOpen();
final Bytes binaryKey = SessionKeySerde.bytesToBinary(key);
final LRUCacheEntry entry = new LRUCacheEntry(value, true,
context.offset(),
- key.window().end(),
context.partition(), context.topic());
+ context.timestamp(),
context.partition(), context.topic());
cache.put(cacheName, cacheFunction.cacheKey(binaryKey), entry);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index a78978b..99c3e7f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -152,7 +152,7 @@ class CachingWindowStore<K, V> extends
WrappedStateStore.AbstractStateStore impl
final Bytes keyBytes = WindowStoreUtils.toBinaryKey(key, timestamp, 0,
bytesSerdes);
final LRUCacheEntry entry = new LRUCacheEntry(value, true,
context.offset(),
- timestamp,
context.partition(), context.topic());
+ context.timestamp(),
context.partition(), context.topic());
cache.put(name, cacheFunction.cacheKey(keyBytes), entry);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 4527c19..3afded5 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -45,11 +45,16 @@ import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
+import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
@@ -196,6 +201,15 @@ public class KStreamAggregationIntegrationTest {
return keyComparison;
}
+ private static <K extends Comparable, V extends Comparable> int
compareIgnoreTimestamp(final KeyValue<K, KeyValue<V, Long>> o1,
+
final KeyValue<K, KeyValue<V, Long>> o2) {
+ final int keyComparison = o1.key.compareTo(o2.key);
+ if (keyComparison == 0) {
+ return o1.value.key.compareTo(o2.value.key);
+ }
+ return keyComparison;
+ }
+
@Test
public void shouldReduceWindowed() throws Exception {
final long firstBatchTimestamp = mockTime.milliseconds();
@@ -325,18 +339,18 @@ public class KStreamAggregationIntegrationTest {
startStreams();
- final List<KeyValue<String, Integer>> windowedMessages =
receiveMessages(
+ final List<KeyValue<String, KeyValue<Integer, Long>>> windowedMessages
= receiveMessagesWithTimestamp(
new StringDeserializer(),
new IntegerDeserializer(),
15);
- final Comparator<KeyValue<String, Integer>>
+ final Comparator<KeyValue<String, KeyValue<Integer, Long>>>
comparator =
- new Comparator<KeyValue<String, Integer>>() {
+ new Comparator<KeyValue<String, KeyValue<Integer, Long>>>() {
@Override
- public int compare(final KeyValue<String, Integer> o1,
- final KeyValue<String, Integer> o2) {
- return KStreamAggregationIntegrationTest.compare(o1, o2);
+ public int compare(final KeyValue<String, KeyValue<Integer,
Long>> o1,
+ final KeyValue<String, KeyValue<Integer,
Long>> o2) {
+ return
KStreamAggregationIntegrationTest.compareIgnoreTimestamp(o1, o2);
}
};
@@ -347,21 +361,21 @@ public class KStreamAggregationIntegrationTest {
assertThat(windowedMessages, is(
Arrays.asList(
- new KeyValue<>("A@" + firstWindow, 1),
- new KeyValue<>("A@" + secondWindow, 1),
- new KeyValue<>("A@" + secondWindow, 2),
- new KeyValue<>("B@" + firstWindow, 1),
- new KeyValue<>("B@" + secondWindow, 1),
- new KeyValue<>("B@" + secondWindow, 2),
- new KeyValue<>("C@" + firstWindow, 1),
- new KeyValue<>("C@" + secondWindow, 1),
- new KeyValue<>("C@" + secondWindow, 2),
- new KeyValue<>("D@" + firstWindow, 1),
- new KeyValue<>("D@" + secondWindow, 1),
- new KeyValue<>("D@" + secondWindow, 2),
- new KeyValue<>("E@" + firstWindow, 1),
- new KeyValue<>("E@" + secondWindow, 1),
- new KeyValue<>("E@" + secondWindow, 2)
+ new KeyValue<>("A@" + firstWindow, KeyValue.pair(1,
firstTimestamp)),
+ new KeyValue<>("A@" + secondWindow, KeyValue.pair(1,
secondTimestamp)),
+ new KeyValue<>("A@" + secondWindow, KeyValue.pair(2,
secondTimestamp)),
+ new KeyValue<>("B@" + firstWindow, KeyValue.pair(1,
firstTimestamp)),
+ new KeyValue<>("B@" + secondWindow, KeyValue.pair(1,
secondTimestamp)),
+ new KeyValue<>("B@" + secondWindow, KeyValue.pair(2,
secondTimestamp)),
+ new KeyValue<>("C@" + firstWindow, KeyValue.pair(1,
firstTimestamp)),
+ new KeyValue<>("C@" + secondWindow, KeyValue.pair(1,
secondTimestamp)),
+ new KeyValue<>("C@" + secondWindow, KeyValue.pair(2,
secondTimestamp)),
+ new KeyValue<>("D@" + firstWindow, KeyValue.pair(1,
firstTimestamp)),
+ new KeyValue<>("D@" + secondWindow, KeyValue.pair(1,
secondTimestamp)),
+ new KeyValue<>("D@" + secondWindow, KeyValue.pair(2,
secondTimestamp)),
+ new KeyValue<>("E@" + firstWindow, KeyValue.pair(1,
firstTimestamp)),
+ new KeyValue<>("E@" + secondWindow, KeyValue.pair(1,
secondTimestamp)),
+ new KeyValue<>("E@" + secondWindow, KeyValue.pair(2,
secondTimestamp))
)));
}
@@ -400,8 +414,9 @@ public class KStreamAggregationIntegrationTest {
public void shouldCount() throws Exception {
produceMessages(mockTime.milliseconds());
- groupedStream.count("count-by-key")
- .to(Serdes.String(), Serdes.Long(), outputTopic);
+ groupedStream.count(Materialized.<String, Long, KeyValueStore<Bytes,
byte[]>>as("count-by-key"))
+ .toStream()
+ .to(outputTopic, Produced.with(Serdes.String(),
Serdes.Long()));
shouldCountHelper();
}
@@ -524,30 +539,51 @@ public class KStreamAggregationIntegrationTest {
new Properties()),
t4);
- final Map<Windowed<String>, Long> results = new HashMap<>();
+ final Map<Windowed<String>, KeyValue<Long, Long>> results = new
HashMap<>();
final CountDownLatch latch = new CountDownLatch(11);
builder.stream(userSessionsStream, Consumed.with(Serdes.String(),
Serdes.String()))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.count(SessionWindows.with(sessionGap).until(maintainMillis))
.toStream()
- .foreach(new ForeachAction<Windowed<String>, Long>() {
+ .transform(new TransformerSupplier<Windowed<String>, Long,
KeyValue<Object, Object>>() {
@Override
- public void apply(final Windowed<String> key, final Long
value) {
- results.put(key, value);
- latch.countDown();
+ public Transformer<Windowed<String>, Long,
KeyValue<Object, Object>> get() {
+ return new Transformer<Windowed<String>, Long,
KeyValue<Object, Object>>() {
+ private ProcessorContext context;
+
+ @Override
+ public void init(final ProcessorContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public KeyValue<Object, Object> transform(final
Windowed<String> key, final Long value) {
+ results.put(key, KeyValue.pair(value,
context.timestamp()));
+ latch.countDown();
+ return null;
+ }
+
+ @Override
+ public KeyValue<Object, Object> punctuate(final
long timestamp) {
+ return null;
+ }
+
+ @Override
+ public void close() {}
+ };
}
});
startStreams();
latch.await(30, TimeUnit.SECONDS);
- assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1,
t1))), equalTo(1L));
- assertThat(results.get(new Windowed<>("penny", new SessionWindow(t1,
t1))), equalTo(1L));
- assertThat(results.get(new Windowed<>("jo", new SessionWindow(t1,
t1))), equalTo(1L));
- assertThat(results.get(new Windowed<>("jo", new SessionWindow(t4,
t4))), equalTo(1L));
- assertThat(results.get(new Windowed<>("emily", new SessionWindow(t1,
t2))), equalTo(2L));
- assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3,
t4))), equalTo(2L));
- assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3,
t3))), equalTo(1L));
+ assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1,
t1))), equalTo(KeyValue.pair(1L, t1)));
+ assertThat(results.get(new Windowed<>("penny", new SessionWindow(t1,
t1))), equalTo(KeyValue.pair(1L, t1)));
+ assertThat(results.get(new Windowed<>("jo", new SessionWindow(t1,
t1))), equalTo(KeyValue.pair(1L, t1)));
+ assertThat(results.get(new Windowed<>("jo", new SessionWindow(t4,
t4))), equalTo(KeyValue.pair(1L, t4)));
+ assertThat(results.get(new Windowed<>("emily", new SessionWindow(t1,
t2))), equalTo(KeyValue.pair(2L, t2)));
+ assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3,
t4))), equalTo(KeyValue.pair(2L, t4)));
+ assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3,
t3))), equalTo(KeyValue.pair(1L, t3)));
}
@SuppressWarnings("deprecation")
@@ -617,19 +653,20 @@ public class KStreamAggregationIntegrationTest {
final String userSessionsStore = "UserSessionsStore";
builder.stream(userSessionsStream, Consumed.with(Serdes.String(),
Serdes.String()))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+
.windowedBy(SessionWindows.with(sessionGap).until(maintainMillis))
.reduce(new Reducer<String>() {
@Override
public String apply(final String value1, final String
value2) {
return value1 + ":" + value2;
}
- }, SessionWindows.with(sessionGap).until(maintainMillis),
userSessionsStore)
- .foreach(new ForeachAction<Windowed<String>, String>() {
- @Override
- public void apply(final Windowed<String> key, final String
value) {
- results.put(key, value);
- latch.countDown();
- }
- });
+ }, Materialized.<String, String, SessionStore<Bytes,
byte[]>>as(userSessionsStore).withValueSerde(Serdes.String()))
+ .foreach(new ForeachAction<Windowed<String>, String>() {
+ @Override
+ public void apply(final Windowed<String> key, final String
value) {
+ results.put(key, value);
+ latch.countDown();
+ }
+ });
startStreams();
latch.await(30, TimeUnit.SECONDS);
@@ -650,10 +687,8 @@ public class KStreamAggregationIntegrationTest {
assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new
SessionWindow(t1, t1)), "start")));
assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new
SessionWindow(t3, t4)), "pause:resume")));
assertFalse(bob.hasNext());
-
}
-
private void produceMessages(final long timestamp) throws Exception {
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
streamOneInput,
@@ -671,7 +706,6 @@ public class KStreamAggregationIntegrationTest {
timestamp);
}
-
private void createTopics() throws InterruptedException {
streamOneInput = "stream-one-" + testNo;
outputTopic = "output-" + testNo;
@@ -685,16 +719,13 @@ public class KStreamAggregationIntegrationTest {
kafkaStreams.start();
}
-
- private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
- keyDeserializer,
- final Deserializer<V>
- valueDeserializer,
+ private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
keyDeserializer,
+ final Deserializer<V>
valueDeserializer,
final int numMessages)
throws InterruptedException {
+
final Properties consumerProperties = new Properties();
- consumerProperties
- .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
"kgroupedstream-test-" + testNo);
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
keyDeserializer.getClass().getName());
@@ -704,7 +735,24 @@ public class KStreamAggregationIntegrationTest {
outputTopic,
numMessages,
60 * 1000);
+ }
+ private <K, V> List<KeyValue<K, KeyValue<V, Long>>>
receiveMessagesWithTimestamp(final Deserializer<K> keyDeserializer,
+
final Deserializer<V> valueDeserializer,
+
final int numMessages)
+ throws InterruptedException {
+
+ final Properties consumerProperties = new Properties();
+
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
"kgroupedstream-test-" + testNo);
+
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
+
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
keyDeserializer.getClass().getName());
+
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
valueDeserializer.getClass().getName());
+ return
IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
+ consumerProperties,
+ outputTopic,
+ numMessages,
+ 60 * 1000);
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 304a3e5..10d4083 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -217,6 +217,37 @@ public class IntegrationTestUtils {
return accumData;
}
+ /**
+ * Wait until enough data (key-value records) has been consumed.
+ *
+ * @param consumerConfig Kafka Consumer configuration
+ * @param topic Topic to consume from
+ * @param expectedNumRecords Minimum number of expected records
+ * @param waitTime Upper bound in waiting time in milliseconds
+ * @return All the records consumed, or null if no records are consumed
+ * @throws AssertionError if the given wait time elapses
+ */
+ public static <K, V> List<KeyValue<K, KeyValue<V, Long>>>
waitUntilMinKeyValueWithTimestampRecordsReceived(final Properties
consumerConfig,
+
final String topic,
+
final int expectedNumRecords,
+
final long waitTime) throws
InterruptedException {
+ final List<KeyValue<K, KeyValue<V, Long>>> accumData = new
ArrayList<>();
+ try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
+ final TestCondition valuesRead = new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ final List<KeyValue<K, KeyValue<V, Long>>> readData =
+ readKeyValuesWithTimestamp(topic, consumer, waitTime,
expectedNumRecords);
+ accumData.addAll(readData);
+ return accumData.size() >= expectedNumRecords;
+ }
+ };
+ final String conditionDetails = "Did not receive all " +
expectedNumRecords + " records from topic " + topic;
+ TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
+ }
+ return accumData;
+ }
+
public static <V> List<V> waitUntilMinValuesRecordsReceived(final
Properties consumerConfig,
final String
topic,
final int
expectedNumRecords) throws InterruptedException {
@@ -382,6 +413,37 @@ public class IntegrationTestUtils {
return consumedValues;
}
+ /**
+ * Returns up to `maxMessages` by reading via the provided consumer (the
topic(s) to read from
+ * are already configured in the consumer).
+ *
+ * @param topic Kafka topic to read messages from
+ * @param consumer Kafka consumer
+ * @param waitTime Maximum wait time in milliseconds
+ * @param maxMessages Maximum number of messages to read via the
consumer
+ * @return The KeyValue elements retrieved via the consumer
+ */
+ private static <K, V> List<KeyValue<K, KeyValue<V, Long>>>
readKeyValuesWithTimestamp(final String topic,
+
final Consumer<K, V> consumer,
+
final long waitTime,
+
final int maxMessages) {
+ final List<KeyValue<K, KeyValue<V, Long>>> consumedValues;
+ consumer.subscribe(Collections.singletonList(topic));
+ final int pollIntervalMs = 100;
+ consumedValues = new ArrayList<>();
+ int totalPollTimeMs = 0;
+ while (totalPollTimeMs < waitTime &&
+ continueConsuming(consumedValues.size(), maxMessages)) {
+ totalPollTimeMs += pollIntervalMs;
+ final ConsumerRecords<K, V> records =
consumer.poll(pollIntervalMs);
+
+ for (final ConsumerRecord<K, V> record : records) {
+ consumedValues.add(new KeyValue<>(record.key(),
KeyValue.pair(record.value(), record.timestamp())));
+ }
+ }
+ return consumedValues;
+ }
+
private static boolean continueConsuming(final int messagesConsumed, final
int maxMessages) {
return maxMessages <= 0 || messagesConsumed < maxMessages;
}