This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 0.10.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/0.10.1 by this push:
new cb5c33d MINOR: Caching layer should forward record timestamp (#5423)
(#5426)
cb5c33d is described below
commit cb5c33d55e7f80ddb9933fb411f5cd2da1377f4f
Author: Matthias J. Sax <[email protected]>
AuthorDate: Thu Jul 26 13:10:24 2018 -0700
MINOR: Caching layer should forward record timestamp (#5423) (#5426)
Reviewer: Guozhang Wang <[email protected]>
---
.../state/internals/CachingWindowStore.java | 2 +-
.../KStreamAggregationIntegrationTest.java | 79 ++++++++++++++--------
.../integration/utils/IntegrationTestUtils.java | 78 +++++++++++++++++++++
3 files changed, 130 insertions(+), 29 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 304a206..8d631e1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -145,7 +145,7 @@ class CachingWindowStore<K, V> implements WindowStore<K,
V>, CachedStateStore<Wi
public synchronized void put(final K key, final V value, final long
timestamp) {
final byte[] binaryKey = WindowStoreUtils.toBinaryKey(key, timestamp,
0, serdes);
final LRUCacheEntry entry = new LRUCacheEntry(serdes.rawValue(value),
true, context.offset(),
- timestamp,
context.partition(), context.topic());
+ context.timestamp(),
context.partition(), context.topic());
cache.put(name, binaryKey, entry);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 383a793..8f0b63a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -185,6 +185,15 @@ public class KStreamAggregationIntegrationTest {
return keyComparison;
}
+ private static <K extends Comparable, V extends Comparable> int
compareIgnoreTimestamp(final KeyValue<K, KeyValue<V, Long>> o1,
+
final KeyValue<K, KeyValue<V, Long>> o2) {
+ final int keyComparison = o1.key.compareTo(o2.key);
+ if (keyComparison == 0) {
+ return o1.value.key.compareTo(o2.value.key);
+ }
+ return keyComparison;
+ }
+
@Test
public void shouldReduceWindowed() throws Exception {
final long firstBatchTimestamp = mockTime.milliseconds();
@@ -310,18 +319,18 @@ public class KStreamAggregationIntegrationTest {
startStreams();
- final List<KeyValue<String, Integer>> windowedMessages =
receiveMessages(
+ final List<KeyValue<String, KeyValue<Integer, Long>>> windowedMessages
= receiveMessagesWithTimestamp(
new StringDeserializer(),
new IntegerDeserializer(),
15);
- final Comparator<KeyValue<String, Integer>>
+ final Comparator<KeyValue<String, KeyValue<Integer, Long>>>
comparator =
- new Comparator<KeyValue<String, Integer>>() {
+ new Comparator<KeyValue<String, KeyValue<Integer, Long>>>() {
@Override
- public int compare(final KeyValue<String, Integer> o1,
- final KeyValue<String, Integer> o2) {
- return KStreamAggregationIntegrationTest.compare(o1, o2);
+ public int compare(final KeyValue<String, KeyValue<Integer,
Long>> o1,
+ final KeyValue<String, KeyValue<Integer,
Long>> o2) {
+ return
KStreamAggregationIntegrationTest.compareIgnoreTimestamp(o1, o2);
}
};
@@ -332,21 +341,21 @@ public class KStreamAggregationIntegrationTest {
assertThat(windowedMessages, is(
Arrays.asList(
- new KeyValue<>("A@" + firstWindow, 1),
- new KeyValue<>("A@" + secondWindow, 1),
- new KeyValue<>("A@" + secondWindow, 2),
- new KeyValue<>("B@" + firstWindow, 1),
- new KeyValue<>("B@" + secondWindow, 1),
- new KeyValue<>("B@" + secondWindow, 2),
- new KeyValue<>("C@" + firstWindow, 1),
- new KeyValue<>("C@" + secondWindow, 1),
- new KeyValue<>("C@" + secondWindow, 2),
- new KeyValue<>("D@" + firstWindow, 1),
- new KeyValue<>("D@" + secondWindow, 1),
- new KeyValue<>("D@" + secondWindow, 2),
- new KeyValue<>("E@" + firstWindow, 1),
- new KeyValue<>("E@" + secondWindow, 1),
- new KeyValue<>("E@" + secondWindow, 2)
+ new KeyValue<>("A@" + firstWindow, KeyValue.pair(1,
firstTimestamp)),
+ new KeyValue<>("A@" + secondWindow, KeyValue.pair(1,
secondTimestamp)),
+ new KeyValue<>("A@" + secondWindow, KeyValue.pair(2,
secondTimestamp)),
+ new KeyValue<>("B@" + firstWindow, KeyValue.pair(1,
firstTimestamp)),
+ new KeyValue<>("B@" + secondWindow, KeyValue.pair(1,
secondTimestamp)),
+ new KeyValue<>("B@" + secondWindow, KeyValue.pair(2,
secondTimestamp)),
+ new KeyValue<>("C@" + firstWindow, KeyValue.pair(1,
firstTimestamp)),
+ new KeyValue<>("C@" + secondWindow, KeyValue.pair(1,
secondTimestamp)),
+ new KeyValue<>("C@" + secondWindow, KeyValue.pair(2,
secondTimestamp)),
+ new KeyValue<>("D@" + firstWindow, KeyValue.pair(1,
firstTimestamp)),
+ new KeyValue<>("D@" + secondWindow, KeyValue.pair(1,
secondTimestamp)),
+ new KeyValue<>("D@" + secondWindow, KeyValue.pair(2,
secondTimestamp)),
+ new KeyValue<>("E@" + firstWindow, KeyValue.pair(1,
firstTimestamp)),
+ new KeyValue<>("E@" + secondWindow, KeyValue.pair(1,
secondTimestamp)),
+ new KeyValue<>("E@" + secondWindow, KeyValue.pair(2,
secondTimestamp))
)));
}
@@ -462,16 +471,13 @@ public class KStreamAggregationIntegrationTest {
kafkaStreams.start();
}
-
- private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
- keyDeserializer,
- final Deserializer<V>
- valueDeserializer,
+ private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
keyDeserializer,
+ final Deserializer<V>
valueDeserializer,
final int numMessages)
throws InterruptedException {
+
final Properties consumerProperties = new Properties();
- consumerProperties
- .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
"kgroupedstream-test-" + testNo);
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
keyDeserializer.getClass().getName());
@@ -481,7 +487,24 @@ public class KStreamAggregationIntegrationTest {
outputTopic,
numMessages,
60 * 1000);
+ }
+ private <K, V> List<KeyValue<K, KeyValue<V, Long>>>
receiveMessagesWithTimestamp(final Deserializer<K> keyDeserializer,
+
final Deserializer<V> valueDeserializer,
+
final int numMessages)
+ throws InterruptedException {
+
+ final Properties consumerProperties = new Properties();
+
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
"kgroupedstream-test-" + testNo);
+
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
+
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
keyDeserializer.getClass().getName());
+
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
valueDeserializer.getClass().getName());
+ return
IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
+ consumerProperties,
+ outputTopic,
+ numMessages,
+ 60 * 1000);
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 117e6ff..91a6ba2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -18,6 +18,8 @@
package org.apache.kafka.streams.integration.utils;
import kafka.utils.Time;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -213,6 +215,37 @@ public class IntegrationTestUtils {
return accumData;
}
+ /**
+ * Wait until enough data (key-value records) has been consumed.
+ *
+ * @param consumerConfig Kafka Consumer configuration
+ * @param topic Topic to consume from
+ * @param expectedNumRecords Minimum number of expected records
+ * @param waitTime Upper bound in waiting time in milliseconds
+ * @return All the records consumed, or null if no records are consumed
+ * @throws AssertionError if the given wait time elapses
+ */
+ public static <K, V> List<KeyValue<K, KeyValue<V, Long>>>
waitUntilMinKeyValueWithTimestampRecordsReceived(final Properties
consumerConfig,
+
final String topic,
+
final int expectedNumRecords,
+
final long waitTime) throws
InterruptedException {
+ final List<KeyValue<K, KeyValue<V, Long>>> accumData = new
ArrayList<>();
+ try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
+ final TestCondition valuesRead = new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ final List<KeyValue<K, KeyValue<V, Long>>> readData =
+ readKeyValuesWithTimestamp(topic, consumer, waitTime,
expectedNumRecords);
+ accumData.addAll(readData);
+ return accumData.size() >= expectedNumRecords;
+ }
+ };
+ final String conditionDetails = "Did not receive all " +
expectedNumRecords + " records from topic " + topic;
+ TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
+ }
+ return accumData;
+ }
+
public static <V> List<V> waitUntilMinValuesRecordsReceived(final
Properties consumerConfig,
final String
topic,
final int
expectedNumRecords) throws InterruptedException {
@@ -253,4 +286,49 @@ public class IntegrationTestUtils {
return accumData;
}
+ /**
+ * Returns up to `maxMessages` by reading via the provided consumer (the
topic(s) to read from
+ * are already configured in the consumer).
+ *
+ * @param topic Kafka topic to read messages from
+ * @param consumer Kafka consumer
+ * @param waitTime Maximum wait time in milliseconds
+ * @param maxMessages Maximum number of messages to read via the
consumer
+ * @return The KeyValue elements retrieved via the consumer
+ */
+ private static <K, V> List<KeyValue<K, KeyValue<V, Long>>>
readKeyValuesWithTimestamp(final String topic,
+
final Consumer<K, V> consumer,
+
final long waitTime,
+
final int maxMessages) {
+ final List<KeyValue<K, KeyValue<V, Long>>> consumedValues;
+ consumer.subscribe(Collections.singletonList(topic));
+ final int pollIntervalMs = 100;
+ consumedValues = new ArrayList<>();
+ int totalPollTimeMs = 0;
+ while (totalPollTimeMs < waitTime &&
+ continueConsuming(consumedValues.size(), maxMessages)) {
+ totalPollTimeMs += pollIntervalMs;
+ final ConsumerRecords<K, V> records =
consumer.poll(pollIntervalMs);
+
+ for (final ConsumerRecord<K, V> record : records) {
+ consumedValues.add(new KeyValue<>(record.key(),
KeyValue.pair(record.value(), record.timestamp())));
+ }
+ }
+ return consumedValues;
+ }
+
+ /**
+ * Sets up a {@link KafkaConsumer} from a copy of the given configuration
that has
+ * {@link ConsumerConfig#AUTO_OFFSET_RESET_CONFIG} set to "earliest" and
{@link ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG}
+ * set to "true" to prevent missing events as well as repeat consumption.
+ * @param consumerConfig Consumer configuration
+ * @return Consumer
+ */
+ private static <K, V> KafkaConsumer<K, V> createConsumer(final Properties
consumerConfig) {
+ final Properties filtered = new Properties();
+ filtered.putAll(consumerConfig);
+ filtered.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
+ filtered.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+ return new KafkaConsumer<>(filtered);
+ }
}