This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new 27224a3 KAFKA-7158: Add unit test for window store range queries
(#5466)
27224a3 is described below
commit 27224a38d923cb28f650710e0af4a35d785e3025
Author: Guozhang Wang <[email protected]>
AuthorDate: Wed Aug 8 13:53:34 2018 -0700
KAFKA-7158: Add unit test for window store range queries (#5466)
While debugging the reported issue, I found that our current unit test
lacks coverage to actually expose the underlying root cause.
Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax
<[email protected]>
minor fix
---
.../state/internals/CachingWindowStoreTest.java | 104 +++++++++++++++++++++
1 file changed, 104 insertions(+)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index b8808ca..551aeb1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -16,20 +16,31 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@@ -38,6 +49,8 @@ import org.junit.Test;
import java.io.IOException;
import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
import static
org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize;
import static org.apache.kafka.test.StreamsTestUtils.toList;
@@ -91,6 +104,97 @@ public class CachingWindowStoreTest {
}
@Test
+ public void shouldNotReturnDuplicatesInRanges() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final StoreBuilder<WindowStore<String, String>> storeBuilder =
Stores.windowStoreBuilder(
+ Stores.persistentWindowStore("store-name", 3600000L, 3, 60000L,
false),
+ Serdes.String(),
+ Serdes.String())
+ .withCachingEnabled();
+
+ builder.addStateStore(storeBuilder);
+
+ builder.stream(topic,
+ Consumed.with(Serdes.String(), Serdes.String()))
+ .transform(() -> new Transformer<String, String, KeyValue<String,
String>>() {
+ private WindowStore<String, String> store;
+ private int numRecordsProcessed;
+
+ @Override
+ public void init(final ProcessorContext processorContext) {
+ this.store = (WindowStore<String, String>)
processorContext.getStateStore("store-name");
+ int count = 0;
+
+ final KeyValueIterator<Windowed<String>, String> all =
store.all();
+ while (all.hasNext()) {
+ count++;
+ all.next();
+ }
+
+ assertThat(count, equalTo(0));
+ }
+
+ @Override
+ public KeyValue<String, String> transform(final String key,
final String value) {
+ int count = 0;
+
+ final KeyValueIterator<Windowed<String>, String> all =
store.all();
+ while (all.hasNext()) {
+ count++;
+ all.next();
+ }
+ assertThat(count, equalTo(numRecordsProcessed));
+
+ store.put(value, value);
+
+ numRecordsProcessed++;
+
+ return new KeyValue<>(key, value);
+ }
+
+ @Override
+ public void close() {
+
+ }
+ }, "store-name");
+
+ final String bootstrapServers = "localhost:9092";
+ final Properties streamsConfiguration = new Properties();
+ streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
"test-app");
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
+
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
+ streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 *
1000);
+
+ final long initialWallClockTime = 0L;
+ final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), streamsConfiguration, initialWallClockTime);
+
+ final ConsumerRecordFactory<String, String> recordFactory = new
ConsumerRecordFactory<>(Serdes.String().serializer(),
Serdes.String().serializer(), initialWallClockTime);
+
+ for (int i = 0; i < 5; i++) {
+ driver.pipeInput(recordFactory.create(topic,
UUID.randomUUID().toString(), UUID.randomUUID().toString()));
+ }
+ driver.advanceWallClockTime(10 * 1000L);
+ recordFactory.advanceTimeMs(10 * 1000L);
+ for (int i = 0; i < 5; i++) {
+ driver.pipeInput(recordFactory.create(topic,
UUID.randomUUID().toString(), UUID.randomUUID().toString()));
+ }
+ driver.advanceWallClockTime(10 * 1000L);
+ recordFactory.advanceTimeMs(10 * 1000L);
+ for (int i = 0; i < 5; i++) {
+ driver.pipeInput(recordFactory.create(topic,
UUID.randomUUID().toString(), UUID.randomUUID().toString()));
+ }
+ driver.advanceWallClockTime(10 * 1000L);
+ recordFactory.advanceTimeMs(10 * 1000L);
+ for (int i = 0; i < 5; i++) {
+ driver.pipeInput(recordFactory.create(topic,
UUID.randomUUID().toString(), UUID.randomUUID().toString()));
+ }
+ }
+
+ @Test
public void shouldPutFetchFromCache() {
cachingStore.put(bytesKey("a"), bytesValue("a"));
cachingStore.put(bytesKey("b"), bytesValue("b"));