This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 732bffcae6a KAFKA-15569: test and add test cases in 
IQv2StoreIntegrationTest (#14523)
732bffcae6a is described below

commit 732bffcae6ad049d894e4dffe1907e8ceeb74a60
Author: Hanyu Zheng <[email protected]>
AuthorDate: Mon Oct 16 17:30:05 2023 -0700

    KAFKA-15569: test and add test cases in IQv2StoreIntegrationTest (#14523)
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../integration/IQv2StoreIntegrationTest.java      | 651 ++++++++++++++++++---
 1 file changed, 576 insertions(+), 75 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
index bde36508321..26d86896296 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
@@ -140,7 +140,7 @@ public class IQv2StoreIntegrationTest {
 
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
     private static final Position POSITION_0 =
-        Position.fromMap(mkMap(mkEntry(INPUT_TOPIC_NAME, mkMap(mkEntry(0, 
1L)))));
+        Position.fromMap(mkMap(mkEntry(INPUT_TOPIC_NAME, mkMap(mkEntry(0, 
5L)))));
 
     public static class UnknownQuery implements Query<Void> { }
 
@@ -408,13 +408,15 @@ public class IQv2StoreIntegrationTest {
 
         final List<Future<RecordMetadata>> futures = new LinkedList<>();
         try (final Producer<Integer, Integer> producer = new 
KafkaProducer<>(producerProps)) {
-            for (int i = 0; i < 4; i++) {
+            for (int i = 0; i < 10; i++) {
+                final int key = i / 2;
+                final int partition = key % partitions;
                 final Future<RecordMetadata> send = producer.send(
                     new ProducerRecord<>(
                         INPUT_TOPIC_NAME,
-                        i % partitions,
-                        RECORD_TIME,
-                        i,
+                        partition,
+                        WINDOW_START + Duration.ofMinutes(2).toMillis() * i,
+                        key,
                         i,
                         null
                     )
@@ -438,8 +440,8 @@ public class IQv2StoreIntegrationTest {
         assertThat(INPUT_POSITION, equalTo(
             Position
                 .emptyPosition()
-                .withComponent(INPUT_TOPIC_NAME, 0, 1L)
-                .withComponent(INPUT_TOPIC_NAME, 1, 1L)
+                .withComponent(INPUT_TOPIC_NAME, 0, 5L)
+                .withComponent(INPUT_TOPIC_NAME, 1, 3L)
         ));
     }
 
@@ -650,12 +652,13 @@ public class IQv2StoreIntegrationTest {
                 public void process(final Record<Integer, Integer> record) {
                     final TimestampedWindowStore<Integer, Integer> stateStore =
                         
context().getStateStore(windowStoreStoreBuilder.name());
+                    // We don't re-implement the DSL logic (which implements 
sum) but instead just keep the lasted value per window
                     stateStore.put(
                         record.key(),
                         ValueAndTimestamp.make(
                             record.value(), record.timestamp()
                         ),
-                        WINDOW_START
+                        (record.timestamp() / WINDOW_SIZE.toMillis()) * 
WINDOW_SIZE.toMillis()
                     );
                 }
             };
@@ -671,7 +674,8 @@ public class IQv2StoreIntegrationTest {
                     public void process(final Record<Integer, Integer> record) 
{
                         final WindowStore<Integer, Integer> stateStore =
                             
context().getStateStore(windowStoreStoreBuilder.name());
-                        stateStore.put(record.key(), record.value(), 
WINDOW_START);
+                        // We don't re-implement the DSL logic (which 
implements sum) but instead just keep the lasted value per window
+                        stateStore.put(record.key(), record.value(), 
(record.timestamp() / WINDOW_SIZE.toMillis()) * WINDOW_SIZE.toMillis());
                     }
                 };
         }
@@ -716,7 +720,9 @@ public class IQv2StoreIntegrationTest {
                 final SessionStore<Integer, Integer> stateStore =
                     context().getStateStore(sessionStoreStoreBuilder.name());
                 stateStore.put(
-                    new Windowed<>(record.key(), new 
SessionWindow(WINDOW_START, WINDOW_START)),
+                    // we do not re-implement the actual session-window logic 
from the DSL here to keep the test simple,
+                    // but instead just put each record into it's own session
+                    new Windowed<>(record.key(), new 
SessionWindow(record.timestamp(), record.timestamp())),
                     record.value()
                 );
             }
@@ -772,17 +778,27 @@ public class IQv2StoreIntegrationTest {
                 shouldRejectUnknownQuery();
                 shouldCollectExecutionInfo();
                 shouldCollectExecutionInfoUnderFailure();
-
+                final String kind = this.kind;
                 if (storeToTest.keyValue()) {
                     if (storeToTest.timestamped()) {
                         final Function<ValueAndTimestamp<Integer>, Integer> 
valueExtractor =
                             ValueAndTimestamp::value;
-                        shouldHandleKeyQuery(2, valueExtractor, 2);
-                        shouldHandleRangeQueries(valueExtractor);
+                        if (kind.equals("DSL")) {
+                            shouldHandleKeyQuery(2, valueExtractor, 5);
+                            shouldHandleRangeDSLQueries(valueExtractor);
+                        } else {
+                            shouldHandleKeyQuery(2, valueExtractor, 5);
+                            shouldHandleRangePAPIQueries(valueExtractor);
+                        }
                     } else {
                         final Function<Integer, Integer> valueExtractor = 
Function.identity();
-                        shouldHandleKeyQuery(2, valueExtractor, 2);
-                        shouldHandleRangeQueries(valueExtractor);
+                        if (kind.equals("DSL")) {
+                            shouldHandleKeyQuery(2, valueExtractor, 5);
+                            shouldHandleRangeDSLQueries(valueExtractor);
+                        } else {
+                            shouldHandleKeyQuery(2, valueExtractor, 5);
+                            shouldHandleRangePAPIQueries(valueExtractor);
+                        }
                     }
                 }
 
@@ -790,19 +806,33 @@ public class IQv2StoreIntegrationTest {
                     if (storeToTest.timestamped()) {
                         final Function<ValueAndTimestamp<Integer>, Integer> 
valueExtractor =
                             ValueAndTimestamp::value;
-                        shouldHandleWindowKeyQueries(valueExtractor);
-                        shouldHandleWindowRangeQueries(valueExtractor);
+                        if (kind.equals("DSL")) {
+                            shouldHandleWindowKeyDSLQueries(valueExtractor);
+                            shouldHandleWindowRangeDSLQueries(valueExtractor);
+                        } else {
+                            shouldHandleWindowKeyPAPIQueries(valueExtractor);
+                            shouldHandleWindowRangePAPIQueries(valueExtractor);
+                        }
                     } else {
                         final Function<Integer, Integer> valueExtractor = 
Function.identity();
-                        shouldHandleWindowKeyQueries(valueExtractor);
-                        shouldHandleWindowRangeQueries(valueExtractor);
+                        if (kind.equals("DSL")) {
+                            shouldHandleWindowKeyDSLQueries(valueExtractor);
+                            shouldHandleWindowRangeDSLQueries(valueExtractor);
+                        } else {
+                            shouldHandleWindowKeyPAPIQueries(valueExtractor);
+                            shouldHandleWindowRangePAPIQueries(valueExtractor);
+                        }
                     }
                 }
 
                 if (storeToTest.isSession()) {
                     // Note there's no "timestamped" differentiation here.
                     // Idiosyncratically, SessionStores are _never_ 
timestamped.
-                    shouldHandleSessionKeyQueries();
+                    if (kind.equals("DSL")) {
+                        shouldHandleSessionKeyDSLQueries();
+                    } else {
+                        shouldHandleSessionKeyPAPIQueries();
+                    }
                 }
             }
         } catch (final AssertionError e) {
@@ -812,57 +842,211 @@ public class IQv2StoreIntegrationTest {
     }
 
 
-    private <T> void shouldHandleRangeQueries(final Function<T, Integer> 
extractor) {
+    private <T> void shouldHandleRangeDSLQueries(final Function<T, Integer> 
extractor) {
+        shouldHandleRangeQuery(
+            Optional.of(0),
+            Optional.of(4),
+            extractor,
+            mkSet(1, 3, 5, 7, 9)
+        );
+
         shouldHandleRangeQuery(
             Optional.of(1),
             Optional.of(3),
             extractor,
-            mkSet(1, 2, 3)
-
+            mkSet(3, 5, 7)
         );
+
         shouldHandleRangeQuery(
-            Optional.of(1),
+            Optional.of(3),
             Optional.empty(),
             extractor,
-            mkSet(1, 2, 3)
+            mkSet(7, 9)
+        );
 
+        shouldHandleRangeQuery(
+            Optional.empty(),
+            Optional.of(3),
+            extractor,
+            mkSet(1, 3, 5, 7)
         );
+
         shouldHandleRangeQuery(
             Optional.empty(),
-            Optional.of(1),
+            Optional.empty(),
             extractor,
-            mkSet(0, 1)
+            mkSet(1, 3, 5, 7, 9)
+        );
+    }
+
+    private <T> void shouldHandleRangePAPIQueries(final Function<T, Integer> 
extractor) {
+        shouldHandleRangeQuery(
+            Optional.of(0),
+            Optional.of(4),
+            extractor,
+            mkSet(1, 3, 5, 7, 9)
+        );
 
+        shouldHandleRangeQuery(
+            Optional.of(1),
+            Optional.of(3),
+            extractor,
+            mkSet(3, 5, 7)
         );
+
         shouldHandleRangeQuery(
+            Optional.of(3),
             Optional.empty(),
+            extractor,
+            mkSet(7, 9)
+        );
+
+        shouldHandleRangeQuery(
             Optional.empty(),
+            Optional.of(3),
             extractor,
-            mkSet(0, 1, 2, 3)
+            mkSet(1, 3, 5, 7)
+        );
 
+        shouldHandleRangeQuery(
+            Optional.empty(),
+            Optional.empty(),
+            extractor,
+            mkSet(1, 3, 5, 7, 9)
         );
     }
 
-    private <T> void shouldHandleWindowKeyQueries(final Function<T, Integer> 
extractor) {
+    private <T> void shouldHandleWindowKeyDSLQueries(final Function<T, 
Integer> extractor) {
 
         // tightest possible start range
+        shouldHandleWindowKeyQuery(
+            0,
+            Instant.ofEpochMilli(WINDOW_START),
+            Instant.ofEpochMilli(WINDOW_START),
+            extractor,
+            mkSet(1)
+        );
+
+        // miss the window start range
+        shouldHandleWindowKeyQuery(
+            0,
+            Instant.ofEpochMilli(WINDOW_START - 1),
+            Instant.ofEpochMilli(WINDOW_START - 1),
+            extractor,
+            mkSet()
+        );
+
+        // do the window key query at the first window and the key of record 
which we want to query is 2
         shouldHandleWindowKeyQuery(
             2,
             Instant.ofEpochMilli(WINDOW_START),
             Instant.ofEpochMilli(WINDOW_START),
             extractor,
+            mkSet()
+        );
+
+        // miss the key
+        shouldHandleWindowKeyQuery(
+            999,
+            Instant.ofEpochMilli(WINDOW_START),
+            Instant.ofEpochMilli(WINDOW_START),
+            extractor,
+            mkSet()
+        );
+
+        // miss both
+        shouldHandleWindowKeyQuery(
+            999,
+            Instant.ofEpochMilli(WINDOW_START - 1),
+            Instant.ofEpochMilli(WINDOW_START - 1),
+            extractor,
+            mkSet()
+        );
+
+        // do the window key query at the first and the second windows and the 
key of record which we want to query is 0
+        shouldHandleWindowKeyQuery(
+            0,
+            Instant.ofEpochMilli(WINDOW_START),
+            Instant.ofEpochMilli(WINDOW_START + 
Duration.ofMinutes(5).toMillis()),
+            extractor,
+            mkSet(1)
+        );
+
+        // do the window key query at the first window and the key of record 
which we want to query is 1
+        shouldHandleWindowKeyQuery(
+            1,
+            Instant.ofEpochMilli(WINDOW_START),
+            Instant.ofEpochMilli(WINDOW_START),
+            extractor,
             mkSet(2)
         );
 
-        // miss the window start range
+        // do the window key query at the second and the third windows and the 
key of record which we want to query is 2
         shouldHandleWindowKeyQuery(
             2,
+            Instant.ofEpochMilli(WINDOW_START + 
Duration.ofMinutes(5).toMillis()),
+            Instant.ofEpochMilli(WINDOW_START + 
Duration.ofMinutes(10).toMillis()),
+            extractor,
+            mkSet(4, 5)
+        );
+
+        // do the window key query at the second and the third windows and the 
key of record which we want to query is 3
+        shouldHandleWindowKeyQuery(
+            3,
+            Instant.ofEpochMilli(WINDOW_START + 
Duration.ofMinutes(5).toMillis()),
+            Instant.ofEpochMilli(WINDOW_START + 
Duration.ofMinutes(10).toMillis()),
+            extractor,
+            mkSet(13)
+        );
+
+        // do the window key query at the fourth and the fifth windows and the 
key of record which we want to query is 4
+        shouldHandleWindowKeyQuery(
+            4,
+            Instant.ofEpochMilli(WINDOW_START + 
Duration.ofMinutes(15).toMillis()),
+            Instant.ofEpochMilli(WINDOW_START + 
Duration.ofMinutes(20).toMillis()),
+            extractor,
+            mkSet(17)
+        );
+
+        // do the window key query at the fifth window and the key of record 
which we want to query is 4
+        shouldHandleWindowKeyQuery(
+            4,
+            Instant.ofEpochMilli(WINDOW_START + 
Duration.ofMinutes(20).toMillis()),
+            Instant.ofEpochMilli(WINDOW_START + 
Duration.ofMinutes(24).toMillis()),
+            extractor,
+            mkSet()
+        );
+    }
+
+    private <T> void shouldHandleWindowKeyPAPIQueries(final Function<T, 
Integer> extractor) {
+
+        // tightest possible start range
+        shouldHandleWindowKeyQuery(
+            0,
+            Instant.ofEpochMilli(WINDOW_START),
+            Instant.ofEpochMilli(WINDOW_START),
+            extractor,
+            mkSet(1)
+        );
+
+        // miss the window start range
+        shouldHandleWindowKeyQuery(
+            0,
             Instant.ofEpochMilli(WINDOW_START - 1),
             Instant.ofEpochMilli(WINDOW_START - 1),
             extractor,
             mkSet()
         );
 
+        // do the window key query at the first window and the key of record 
which we want to query is 2
+        shouldHandleWindowKeyQuery(
+            2,
+            Instant.ofEpochMilli(WINDOW_START),
+            Instant.ofEpochMilli(WINDOW_START),
+            extractor,
+            mkSet()
+        );
+
         // miss the key
         shouldHandleWindowKeyQuery(
             999,
@@ -880,19 +1064,179 @@ public class IQv2StoreIntegrationTest {
             extractor,
             mkSet()
         );
+
+        // do the window key query at the first and the second windows and the 
key of record which we want to query is 0
+        shouldHandleWindowKeyQuery(
+            0,
+            Instant.ofEpochMilli(WINDOW_START),
+            Instant.ofEpochMilli(WINDOW_START + 
Duration.ofMinutes(5).toMillis()),
+            extractor,
+            mkSet(1)
+        );
+
+        // do the window key query at the first window and the key of record 
which we want to query is 1
+        shouldHandleWindowKeyQuery(
+            1,
+            Instant.ofEpochMilli(WINDOW_START),
+            Instant.ofEpochMilli(WINDOW_START),
+            extractor,
+            mkSet(2)
+        );
+
+        // do the window key query at the second and the third windows and the 
key of record which we want to query is 2
+        shouldHandleWindowKeyQuery(
+            2,
+            Instant.ofEpochMilli(WINDOW_START + 
Duration.ofMinutes(5).toMillis()),
+            Instant.ofEpochMilli(WINDOW_START + 
Duration.ofMinutes(10).toMillis()),
+            extractor,
+            mkSet(4, 5)
+        );
+
+        // do the window key query at the second and the third windows and the 
key of record which we want to query is 3
+        shouldHandleWindowKeyQuery(
+            3,
+            Instant.ofEpochMilli(WINDOW_START + 
Duration.ofMinutes(5).toMillis()),
+            Instant.ofEpochMilli(WINDOW_START + 
Duration.ofMinutes(10).toMillis()),
+            extractor,
+            mkSet(7)
+        );
+
+        // do the window key query at the fourth and the fifth windows and the 
key of record which we want to query is 4
+        shouldHandleWindowKeyQuery(
+            4,
+            Instant.ofEpochMilli(WINDOW_START + 
Duration.ofMinutes(15).toMillis()),
+            Instant.ofEpochMilli(WINDOW_START + 
Duration.ofMinutes(20).toMillis()),
+            extractor,
+            mkSet(9)
+        );
+
+        // do the window key query at the fifth window and the key of record 
which we want to query is 4
+        shouldHandleWindowKeyQuery(
+            4,
+            Instant.ofEpochMilli(WINDOW_START + 
Duration.ofMinutes(20).toMillis()),
+            Instant.ofEpochMilli(WINDOW_START + 
Duration.ofMinutes(24).toMillis()),
+            extractor,
+            mkSet()
+        );
     }
 
-    private <T> void shouldHandleWindowRangeQueries(final Function<T, Integer> 
extractor) {
+    private <T> void shouldHandleWindowRangeDSLQueries(final Function<T, 
Integer> extractor) {
         final long windowSize = WINDOW_SIZE.toMillis();
         final long windowStart = (RECORD_TIME / windowSize) * windowSize;
 
+        // miss the window start
+        shouldHandleWindowRangeQuery(
+            Instant.ofEpochMilli(windowStart - 1),
+            Instant.ofEpochMilli(windowStart - 1),
+            extractor,
+            mkSet()
+        );
+
+        // do the query at the first window
+        shouldHandleWindowRangeQuery(
+            Instant.ofEpochMilli(windowStart),
+            Instant.ofEpochMilli(windowStart),
+            extractor,
+            mkSet(1, 2)
+        );
+
+        // do the query at the first and the second windows
         shouldHandleWindowRangeQuery(
             Instant.ofEpochMilli(windowStart),
+            Instant.ofEpochMilli(windowStart + 
Duration.ofMinutes(5).toMillis()),
+            extractor,
+            mkSet(1, 2, 3, 4)
+        );
+
+        // do the query at the second and the third windows
+        shouldHandleWindowRangeQuery(
+            Instant.ofEpochMilli(windowStart + 
Duration.ofMinutes(5).toMillis()),
+            Instant.ofEpochMilli(windowStart + 
Duration.ofMinutes(10).toMillis()),
+            extractor,
+            mkSet(3, 4, 5, 13)
+        );
+
+        // do the query at the third and the fourth windows
+        shouldHandleWindowRangeQuery(
+            Instant.ofEpochMilli(windowStart + 
Duration.ofMinutes(10).toMillis()),
+            Instant.ofEpochMilli(windowStart + 
Duration.ofMinutes(15).toMillis()),
+            extractor,
+            mkSet(17, 5, 13)
+        );
+
+        // do the query at the fourth and the fifth windows
+        shouldHandleWindowRangeQuery(
+            Instant.ofEpochMilli(windowStart + 
Duration.ofMinutes(15).toMillis()),
+            Instant.ofEpochMilli(windowStart + 
Duration.ofMinutes(20).toMillis()),
+            extractor,
+            mkSet(17)
+        );
+
+        //do the query at the fifth and the sixth windows
+        shouldHandleWindowRangeQuery(
+            Instant.ofEpochMilli(windowStart + 
Duration.ofMinutes(20).toMillis()),
+            Instant.ofEpochMilli(windowStart + 
Duration.ofMinutes(25).toMillis()),
+            extractor,
+            mkSet()
+        );
+
+        // do the query from the second to the fourth windows
+        shouldHandleWindowRangeQuery(
+            Instant.ofEpochMilli(windowStart + 
Duration.ofMinutes(5).toMillis()),
+            Instant.ofEpochMilli(windowStart + 
Duration.ofMinutes(15).toMillis()),
+            extractor,
+            mkSet(17, 3, 4, 5, 13)
+        );
+
+        // do the query from the first to the fourth windows
+        shouldHandleWindowRangeQuery(
             Instant.ofEpochMilli(windowStart),
+            Instant.ofEpochMilli(windowStart + 
Duration.ofMinutes(15).toMillis()),
             extractor,
-            mkSet(0, 1, 2, 3)
+            mkSet(1, 17, 2, 3, 4, 5, 13)
         );
 
+        // Should fail to execute this query on a WindowStore.
+        final WindowRangeQuery<Integer, T> query = WindowRangeQuery.withKey(2);
+
+        final StateQueryRequest<KeyValueIterator<Windowed<Integer>, T>> 
request =
+            inStore(STORE_NAME)
+                .withQuery(query)
+                .withPartitions(mkSet(0, 1))
+                .withPositionBound(PositionBound.at(INPUT_POSITION));
+
+        final StateQueryResult<KeyValueIterator<Windowed<Integer>, T>> result =
+            IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+
+        if (result.getGlobalResult() != null) {
+            fail("global tables aren't implemented");
+        } else {
+            final Map<Integer, QueryResult<KeyValueIterator<Windowed<Integer>, 
T>>> queryResult =
+                result.getPartitionResults();
+            for (final int partition : queryResult.keySet()) {
+                final QueryResult<KeyValueIterator<Windowed<Integer>, T>> 
partitionResult =
+                    queryResult.get(partition);
+                final boolean failure = partitionResult.isFailure();
+                if (!failure) {
+                    throw new AssertionError(queryResult.toString());
+                }
+                assertThat(partitionResult.getFailureReason(), 
is(FailureReason.UNKNOWN_QUERY_TYPE));
+                assertThat(partitionResult.getFailureMessage(), matchesPattern(
+                    "This store"
+                        + " \\(class 
org.apache.kafka.streams.state.internals.Metered.*WindowStore\\)"
+                        + " doesn't know how to execute the given query"
+                        + " \\(WindowRangeQuery\\{key=Optional\\[2], 
timeFrom=Optional.empty, timeTo=Optional.empty}\\)"
+                        + " because WindowStores only supports 
WindowRangeQuery.withWindowStartRange\\."
+                        + " Contact the store maintainer if you need support 
for a new query type\\."
+                ));
+            }
+        }
+    }
+
+    private <T> void shouldHandleWindowRangePAPIQueries(final Function<T, 
Integer> extractor) {
+        final long windowSize = WINDOW_SIZE.toMillis();
+        final long windowStart = (RECORD_TIME / windowSize) * windowSize;
+
         // miss the window start
         shouldHandleWindowRangeQuery(
             Instant.ofEpochMilli(windowStart - 1),
@@ -901,17 +1245,81 @@ public class IQv2StoreIntegrationTest {
             mkSet()
         );
 
+        // do the query at the first window
+        shouldHandleWindowRangeQuery(
+            Instant.ofEpochMilli(windowStart),
+            Instant.ofEpochMilli(windowStart),
+            extractor,
+            mkSet(1, 2)
+        );
+
+        // do the query at the first and the second windows
+        shouldHandleWindowRangeQuery(
+            Instant.ofEpochMilli(windowStart),
+            Instant.ofEpochMilli(windowStart + 
Duration.ofMinutes(5).toMillis()),
+            extractor,
+            mkSet(1, 2, 3, 4)
+        );
+
+        // do the query at the second and the third windows
+        shouldHandleWindowRangeQuery(
+            Instant.ofEpochMilli(windowStart + 
Duration.ofMinutes(5).toMillis()),
+            Instant.ofEpochMilli(windowStart + 
Duration.ofMinutes(10).toMillis()),
+            extractor,
+            mkSet(3, 4, 5, 7)
+        );
+
+        // do the query at the third and the fourth windows
+        shouldHandleWindowRangeQuery(
+            Instant.ofEpochMilli(windowStart + 
Duration.ofMinutes(10).toMillis()),
+            Instant.ofEpochMilli(windowStart + 
Duration.ofMinutes(15).toMillis()),
+            extractor,
+            mkSet(5, 7, 9)
+        );
+
+        // do the query at the fourth and the fifth windows
+        shouldHandleWindowRangeQuery(
+            Instant.ofEpochMilli(windowStart + 
Duration.ofMinutes(15).toMillis()),
+            Instant.ofEpochMilli(windowStart + 
Duration.ofMinutes(20).toMillis()),
+            extractor,
+            mkSet(9)
+        );
+
+        //do the query at the fifth and the sixth windows
+        shouldHandleWindowRangeQuery(
+            Instant.ofEpochMilli(windowStart + 
Duration.ofMinutes(20).toMillis()),
+            Instant.ofEpochMilli(windowStart + 
Duration.ofMinutes(25).toMillis()),
+            extractor,
+            mkSet()
+        );
+
+        // do the query from the second to the fourth windows
+        shouldHandleWindowRangeQuery(
+            Instant.ofEpochMilli(windowStart + 
Duration.ofMinutes(5).toMillis()),
+            Instant.ofEpochMilli(windowStart + 
Duration.ofMinutes(15).toMillis()),
+            extractor,
+            mkSet(3, 4, 5, 7, 9)
+        );
+
+        // do the query from the first to the fourth windows
+        shouldHandleWindowRangeQuery(
+            Instant.ofEpochMilli(windowStart),
+            Instant.ofEpochMilli(windowStart + 
Duration.ofMinutes(15).toMillis()),
+            extractor,
+            mkSet(1, 2, 3, 4, 5, 7, 9)
+        );
+
         // Should fail to execute this query on a WindowStore.
         final WindowRangeQuery<Integer, T> query = WindowRangeQuery.withKey(2);
 
         final StateQueryRequest<KeyValueIterator<Windowed<Integer>, T>> 
request =
-                inStore(STORE_NAME)
-                        .withQuery(query)
-                        .withPartitions(mkSet(0, 1))
-                        .withPositionBound(PositionBound.at(INPUT_POSITION));
+            inStore(STORE_NAME)
+                .withQuery(query)
+                .withPartitions(mkSet(0, 1))
+                .withPositionBound(PositionBound.at(INPUT_POSITION));
 
         final StateQueryResult<KeyValueIterator<Windowed<Integer>, T>> result =
-                IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+            IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
 
         if (result.getGlobalResult() != null) {
             fail("global tables aren't implemented");
@@ -938,10 +1346,103 @@ public class IQv2StoreIntegrationTest {
         }
     }
 
-    private <T> void shouldHandleSessionKeyQueries() {
+    private <T> void shouldHandleSessionKeyDSLQueries() {
+        shouldHandleSessionRangeQuery(
+            0,
+            mkSet(1)
+        );
+
+        shouldHandleSessionRangeQuery(
+            1,
+            mkSet(5)
+        );
+
         shouldHandleSessionRangeQuery(
             2,
-            mkSet(2)
+            mkSet(9)
+        );
+
+        shouldHandleSessionRangeQuery(
+            3,
+            mkSet(13)
+        );
+
+        shouldHandleSessionRangeQuery(
+            4,
+            mkSet(17)
+        );
+
+        // not preset, so empty result iter
+        shouldHandleSessionRangeQuery(
+            999,
+            mkSet()
+        );
+
+        // Should fail to execute this query on a SessionStore.
+        final WindowRangeQuery<Integer, T> query =
+            WindowRangeQuery.withWindowStartRange(
+                Instant.ofEpochMilli(0L),
+                Instant.ofEpochMilli(0L)
+            );
+
+        final StateQueryRequest<KeyValueIterator<Windowed<Integer>, T>> 
request =
+            inStore(STORE_NAME)
+                .withQuery(query)
+                .withPartitions(mkSet(0, 1))
+                .withPositionBound(PositionBound.at(INPUT_POSITION));
+
+        final StateQueryResult<KeyValueIterator<Windowed<Integer>, T>> result =
+            IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+
+        if (result.getGlobalResult() != null) {
+            fail("global tables aren't implemented");
+        } else {
+            final Map<Integer, QueryResult<KeyValueIterator<Windowed<Integer>, 
T>>> queryResult =
+                result.getPartitionResults();
+            for (final int partition : queryResult.keySet()) {
+                final QueryResult<KeyValueIterator<Windowed<Integer>, T>> 
partitionResult =
+                    queryResult.get(partition);
+                final boolean failure = partitionResult.isFailure();
+                if (!failure) {
+                    throw new AssertionError(queryResult.toString());
+                }
+                assertThat(partitionResult.getFailureReason(), 
is(FailureReason.UNKNOWN_QUERY_TYPE));
+                assertThat(partitionResult.getFailureMessage(), is(
+                    "This store"
+                        + " (class 
org.apache.kafka.streams.state.internals.MeteredSessionStore)"
+                        + " doesn't know how to execute the given query"
+                        + " (WindowRangeQuery{key=Optional.empty, 
timeFrom=Optional[1970-01-01T00:00:00Z], 
timeTo=Optional[1970-01-01T00:00:00Z]})"
+                        + " because SessionStores only support 
WindowRangeQuery.withKey."
+                        + " Contact the store maintainer if you need support 
for a new query type."
+                ));
+            }
+        }
+    }
+
+    private <T> void shouldHandleSessionKeyPAPIQueries() {
+        shouldHandleSessionRangeQuery(
+            0,
+            mkSet(0, 1)
+        );
+
+        shouldHandleSessionRangeQuery(
+            1,
+            mkSet(2, 3)
+        );
+
+        shouldHandleSessionRangeQuery(
+            2,
+            mkSet(4, 5)
+        );
+
+        shouldHandleSessionRangeQuery(
+            3,
+            mkSet(6, 7)
+        );
+
+        shouldHandleSessionRangeQuery(
+            4,
+            mkSet(8, 9)
         );
 
         // not preset, so empty result iter
@@ -1008,7 +1509,7 @@ public class IQv2StoreIntegrationTest {
         assertThat(
             result.getGlobalResult().getFailureMessage(),
             is("Global stores do not yet support the KafkaStreams#query API."
-                   + " Use KafkaStreams#store instead.")
+                + " Use KafkaStreams#store instead.")
         );
     }
 
@@ -1048,19 +1549,19 @@ public class IQv2StoreIntegrationTest {
     }
 
     public <V> void shouldHandleKeyQuery(
-            final Integer key,
-            final Function<V, Integer> valueExtactor,
-            final Integer expectedValue) {
+        final Integer key,
+        final Function<V, Integer> valueExtactor,
+        final Integer expectedValue) {
 
         final KeyQuery<Integer, V> query = KeyQuery.withKey(key);
         final StateQueryRequest<V> request =
-                inStore(STORE_NAME)
-                        .withQuery(query)
-                        .withPartitions(mkSet(0, 1))
-                        .withPositionBound(PositionBound.at(INPUT_POSITION));
+            inStore(STORE_NAME)
+                .withQuery(query)
+                .withPartitions(mkSet(0, 1))
+                .withPositionBound(PositionBound.at(INPUT_POSITION));
 
         final StateQueryResult<V> result =
-                IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+            IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
 
         final QueryResult<V> queryResult = result.getOnlyPartitionResult();
         final boolean failure = queryResult.isFailure();
@@ -1071,8 +1572,8 @@ public class IQv2StoreIntegrationTest {
 
         assertThrows(IllegalArgumentException.class, 
queryResult::getFailureReason);
         assertThrows(
-                IllegalArgumentException.class,
-                queryResult::getFailureMessage
+            IllegalArgumentException.class,
+            queryResult::getFailureMessage
         );
 
         final V result1 = queryResult.getResult();
@@ -1080,7 +1581,7 @@ public class IQv2StoreIntegrationTest {
         assertThat(integer, is(expectedValue));
         assertThat(queryResult.getExecutionInfo(), is(empty()));
         assertThat(queryResult.getPosition(), is(POSITION_0));
-    }   
+    }
 
     public <V> void shouldHandleRangeQuery(
         final Optional<Integer> lower,
@@ -1189,21 +1690,21 @@ public class IQv2StoreIntegrationTest {
     }
 
     public <V> void shouldHandleWindowRangeQuery(
-            final Instant timeFrom,
-            final Instant timeTo,
-            final Function<V, Integer> valueExtactor,
-            final Set<Integer> expectedValue) {
+        final Instant timeFrom,
+        final Instant timeTo,
+        final Function<V, Integer> valueExtactor,
+        final Set<Integer> expectedValue) {
 
         final WindowRangeQuery<Integer, V> query = 
WindowRangeQuery.withWindowStartRange(timeFrom, timeTo);
 
         final StateQueryRequest<KeyValueIterator<Windowed<Integer>, V>> 
request =
-                inStore(STORE_NAME)
-                        .withQuery(query)
-                        .withPartitions(mkSet(0, 1))
-                        .withPositionBound(PositionBound.at(INPUT_POSITION));
+            inStore(STORE_NAME)
+                .withQuery(query)
+                .withPartitions(mkSet(0, 1))
+                .withPositionBound(PositionBound.at(INPUT_POSITION));
 
         final StateQueryResult<KeyValueIterator<Windowed<Integer>, V>> result =
-                IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+            IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
 
         if (result.getGlobalResult() != null) {
             fail("global tables aren't implemented");
@@ -1218,12 +1719,12 @@ public class IQv2StoreIntegrationTest {
                 assertThat(queryResult.get(partition).isSuccess(), is(true));
 
                 assertThrows(
-                        IllegalArgumentException.class,
-                        queryResult.get(partition)::getFailureReason
+                    IllegalArgumentException.class,
+                    queryResult.get(partition)::getFailureReason
                 );
                 assertThrows(
-                        IllegalArgumentException.class,
-                        queryResult.get(partition)::getFailureMessage
+                    IllegalArgumentException.class,
+                    queryResult.get(partition)::getFailureMessage
                 );
 
                 try (final KeyValueIterator<Windowed<Integer>, V> iterator = 
queryResult.get(partition).getResult()) {
@@ -1239,18 +1740,18 @@ public class IQv2StoreIntegrationTest {
     }
 
     public <V> void shouldHandleSessionRangeQuery(
-            final Integer key,
-            final Set<Integer> expectedValue) {
+        final Integer key,
+        final Set<Integer> expectedValue) {
 
         final WindowRangeQuery<Integer, V> query = 
WindowRangeQuery.withKey(key);
 
         final StateQueryRequest<KeyValueIterator<Windowed<Integer>, V>> 
request =
-                inStore(STORE_NAME)
-                        .withQuery(query)
-                        .withPartitions(mkSet(0, 1))
-                        .withPositionBound(PositionBound.at(INPUT_POSITION));
+            inStore(STORE_NAME)
+                .withQuery(query)
+                .withPartitions(mkSet(0, 1))
+                .withPositionBound(PositionBound.at(INPUT_POSITION));
         final StateQueryResult<KeyValueIterator<Windowed<Integer>, V>> result =
-                IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+            IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
 
         if (result.getGlobalResult() != null) {
             fail("global tables aren't implemented");
@@ -1265,12 +1766,12 @@ public class IQv2StoreIntegrationTest {
                 assertThat(queryResult.get(partition).isSuccess(), is(true));
 
                 assertThrows(
-                        IllegalArgumentException.class,
-                        queryResult.get(partition)::getFailureReason
+                    IllegalArgumentException.class,
+                    queryResult.get(partition)::getFailureReason
                 );
                 assertThrows(
-                        IllegalArgumentException.class,
-                        queryResult.get(partition)::getFailureMessage
+                    IllegalArgumentException.class,
+                    queryResult.get(partition)::getFailureMessage
                 );
 
                 try (final KeyValueIterator<Windowed<Integer>, V> iterator = 
queryResult.get(partition).getResult()) {


Reply via email to