This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d12ceac  KAFKA-7158: Add unit test for window store range queries 
(#5466)
d12ceac is described below

commit d12ceacd7aff623ae8c9063f89655b1f54af8123
Author: Guozhang Wang <[email protected]>
AuthorDate: Wed Aug 8 13:53:34 2018 -0700

    KAFKA-7158: Add unit test for window store range queries (#5466)
    
    While debugging the reported issue, I found that our current unit test 
lacks coverage to actually expose the underlying root cause.
    
    Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax 
<[email protected]>
---
 .../state/internals/CachingWindowStoreTest.java    | 104 +++++++++++++++++++++
 1 file changed, 104 insertions(+)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index 3bcb1a2..0e7f88a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -16,18 +16,29 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -35,6 +46,8 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
 
 import static org.apache.kafka.common.utils.Utils.mkList;
 import static 
org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize;
@@ -84,6 +97,97 @@ public class CachingWindowStoreTest {
     }
 
     @Test
+    public void shouldNotReturnDuplicatesInRanges() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final StoreBuilder<WindowStore<String, String>> storeBuilder = 
Stores.windowStoreBuilder(
+            Stores.persistentWindowStore("store-name", 3600000L, 60000L, 
false),
+            Serdes.String(),
+            Serdes.String())
+            .withCachingEnabled();
+
+        builder.addStateStore(storeBuilder);
+
+        builder.stream(topic,
+            Consumed.with(Serdes.String(), Serdes.String()))
+            .transform(() -> new Transformer<String, String, KeyValue<String, 
String>>() {
+                private WindowStore<String, String> store;
+                private int numRecordsProcessed;
+
+                @Override
+                public void init(final ProcessorContext processorContext) {
+                    this.store = (WindowStore<String, String>) 
processorContext.getStateStore("store-name");
+                    int count = 0;
+
+                    final KeyValueIterator<Windowed<String>, String> all = 
store.all();
+                    while (all.hasNext()) {
+                        count++;
+                        all.next();
+                    }
+
+                    assertThat(count, equalTo(0));
+                }
+
+                @Override
+                public KeyValue<String, String> transform(final String key, 
final String value) {
+                    int count = 0;
+
+                    final KeyValueIterator<Windowed<String>, String> all = 
store.all();
+                    while (all.hasNext()) {
+                        count++;
+                        all.next();
+                    }
+                    assertThat(count, equalTo(numRecordsProcessed));
+
+                    store.put(value, value);
+
+                    numRecordsProcessed++;
+
+                    return new KeyValue<>(key, value);
+                }
+
+                @Override
+                public void close() {
+
+                }
+            }, "store-name");
+
+        final String bootstrapServers = "localhost:9092";
+        final Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"test-app");
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 
1000);
+
+        final long initialWallClockTime = 0L;
+        final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), streamsConfiguration, initialWallClockTime);
+
+        final ConsumerRecordFactory<String, String> recordFactory = new 
ConsumerRecordFactory<>(Serdes.String().serializer(), 
Serdes.String().serializer(), initialWallClockTime);
+
+        for (int i = 0; i < 5; i++) {
+            driver.pipeInput(recordFactory.create(topic, 
UUID.randomUUID().toString(), UUID.randomUUID().toString()));
+        }
+        driver.advanceWallClockTime(10 * 1000L);
+        recordFactory.advanceTimeMs(10 * 1000L);
+        for (int i = 0; i < 5; i++) {
+            driver.pipeInput(recordFactory.create(topic, 
UUID.randomUUID().toString(), UUID.randomUUID().toString()));
+        }
+        driver.advanceWallClockTime(10 * 1000L);
+        recordFactory.advanceTimeMs(10 * 1000L);
+        for (int i = 0; i < 5; i++) {
+            driver.pipeInput(recordFactory.create(topic, 
UUID.randomUUID().toString(), UUID.randomUUID().toString()));
+        }
+        driver.advanceWallClockTime(10 * 1000L);
+        recordFactory.advanceTimeMs(10 * 1000L);
+        for (int i = 0; i < 5; i++) {
+            driver.pipeInput(recordFactory.create(topic, 
UUID.randomUUID().toString(), UUID.randomUUID().toString()));
+        }
+    }
+
+    @Test
     public void shouldPutFetchFromCache() {
         cachingStore.put(bytesKey("a"), bytesValue("a"));
         cachingStore.put(bytesKey("b"), bytesValue("b"));

Reply via email to