This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 732bffcae6a KAFKA-15569: test and add test cases in
IQv2StoreIntegrationTest (#14523)
732bffcae6a is described below
commit 732bffcae6ad049d894e4dffe1907e8ceeb74a60
Author: Hanyu Zheng <[email protected]>
AuthorDate: Mon Oct 16 17:30:05 2023 -0700
KAFKA-15569: test and add test cases in IQv2StoreIntegrationTest (#14523)
Reviewers: Matthias J. Sax <[email protected]>
---
.../integration/IQv2StoreIntegrationTest.java | 651 ++++++++++++++++++---
1 file changed, 576 insertions(+), 75 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
index bde36508321..26d86896296 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
@@ -140,7 +140,7 @@ public class IQv2StoreIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
private static final Position POSITION_0 =
- Position.fromMap(mkMap(mkEntry(INPUT_TOPIC_NAME, mkMap(mkEntry(0,
1L)))));
+ Position.fromMap(mkMap(mkEntry(INPUT_TOPIC_NAME, mkMap(mkEntry(0,
5L)))));
public static class UnknownQuery implements Query<Void> { }
@@ -408,13 +408,15 @@ public class IQv2StoreIntegrationTest {
final List<Future<RecordMetadata>> futures = new LinkedList<>();
try (final Producer<Integer, Integer> producer = new
KafkaProducer<>(producerProps)) {
- for (int i = 0; i < 4; i++) {
+ for (int i = 0; i < 10; i++) {
+ final int key = i / 2;
+ final int partition = key % partitions;
final Future<RecordMetadata> send = producer.send(
new ProducerRecord<>(
INPUT_TOPIC_NAME,
- i % partitions,
- RECORD_TIME,
- i,
+ partition,
+ WINDOW_START + Duration.ofMinutes(2).toMillis() * i,
+ key,
i,
null
)
@@ -438,8 +440,8 @@ public class IQv2StoreIntegrationTest {
assertThat(INPUT_POSITION, equalTo(
Position
.emptyPosition()
- .withComponent(INPUT_TOPIC_NAME, 0, 1L)
- .withComponent(INPUT_TOPIC_NAME, 1, 1L)
+ .withComponent(INPUT_TOPIC_NAME, 0, 5L)
+ .withComponent(INPUT_TOPIC_NAME, 1, 3L)
));
}
@@ -650,12 +652,13 @@ public class IQv2StoreIntegrationTest {
public void process(final Record<Integer, Integer> record) {
final TimestampedWindowStore<Integer, Integer> stateStore =
context().getStateStore(windowStoreStoreBuilder.name());
+ // We don't re-implement the DSL logic (which implements
sum) but instead just keep the lasted value per window
stateStore.put(
record.key(),
ValueAndTimestamp.make(
record.value(), record.timestamp()
),
- WINDOW_START
+ (record.timestamp() / WINDOW_SIZE.toMillis()) *
WINDOW_SIZE.toMillis()
);
}
};
@@ -671,7 +674,8 @@ public class IQv2StoreIntegrationTest {
public void process(final Record<Integer, Integer> record)
{
final WindowStore<Integer, Integer> stateStore =
context().getStateStore(windowStoreStoreBuilder.name());
- stateStore.put(record.key(), record.value(),
WINDOW_START);
+ // We don't re-implement the DSL logic (which
implements sum) but instead just keep the lasted value per window
+ stateStore.put(record.key(), record.value(),
(record.timestamp() / WINDOW_SIZE.toMillis()) * WINDOW_SIZE.toMillis());
}
};
}
@@ -716,7 +720,9 @@ public class IQv2StoreIntegrationTest {
final SessionStore<Integer, Integer> stateStore =
context().getStateStore(sessionStoreStoreBuilder.name());
stateStore.put(
- new Windowed<>(record.key(), new
SessionWindow(WINDOW_START, WINDOW_START)),
+ // we do not re-implement the actual session-window logic
from the DSL here to keep the test simple,
+ // but instead just put each record into it's own session
+ new Windowed<>(record.key(), new
SessionWindow(record.timestamp(), record.timestamp())),
record.value()
);
}
@@ -772,17 +778,27 @@ public class IQv2StoreIntegrationTest {
shouldRejectUnknownQuery();
shouldCollectExecutionInfo();
shouldCollectExecutionInfoUnderFailure();
-
+ final String kind = this.kind;
if (storeToTest.keyValue()) {
if (storeToTest.timestamped()) {
final Function<ValueAndTimestamp<Integer>, Integer>
valueExtractor =
ValueAndTimestamp::value;
- shouldHandleKeyQuery(2, valueExtractor, 2);
- shouldHandleRangeQueries(valueExtractor);
+ if (kind.equals("DSL")) {
+ shouldHandleKeyQuery(2, valueExtractor, 5);
+ shouldHandleRangeDSLQueries(valueExtractor);
+ } else {
+ shouldHandleKeyQuery(2, valueExtractor, 5);
+ shouldHandleRangePAPIQueries(valueExtractor);
+ }
} else {
final Function<Integer, Integer> valueExtractor =
Function.identity();
- shouldHandleKeyQuery(2, valueExtractor, 2);
- shouldHandleRangeQueries(valueExtractor);
+ if (kind.equals("DSL")) {
+ shouldHandleKeyQuery(2, valueExtractor, 5);
+ shouldHandleRangeDSLQueries(valueExtractor);
+ } else {
+ shouldHandleKeyQuery(2, valueExtractor, 5);
+ shouldHandleRangePAPIQueries(valueExtractor);
+ }
}
}
@@ -790,19 +806,33 @@ public class IQv2StoreIntegrationTest {
if (storeToTest.timestamped()) {
final Function<ValueAndTimestamp<Integer>, Integer>
valueExtractor =
ValueAndTimestamp::value;
- shouldHandleWindowKeyQueries(valueExtractor);
- shouldHandleWindowRangeQueries(valueExtractor);
+ if (kind.equals("DSL")) {
+ shouldHandleWindowKeyDSLQueries(valueExtractor);
+ shouldHandleWindowRangeDSLQueries(valueExtractor);
+ } else {
+ shouldHandleWindowKeyPAPIQueries(valueExtractor);
+ shouldHandleWindowRangePAPIQueries(valueExtractor);
+ }
} else {
final Function<Integer, Integer> valueExtractor =
Function.identity();
- shouldHandleWindowKeyQueries(valueExtractor);
- shouldHandleWindowRangeQueries(valueExtractor);
+ if (kind.equals("DSL")) {
+ shouldHandleWindowKeyDSLQueries(valueExtractor);
+ shouldHandleWindowRangeDSLQueries(valueExtractor);
+ } else {
+ shouldHandleWindowKeyPAPIQueries(valueExtractor);
+ shouldHandleWindowRangePAPIQueries(valueExtractor);
+ }
}
}
if (storeToTest.isSession()) {
// Note there's no "timestamped" differentiation here.
// Idiosyncratically, SessionStores are _never_
timestamped.
- shouldHandleSessionKeyQueries();
+ if (kind.equals("DSL")) {
+ shouldHandleSessionKeyDSLQueries();
+ } else {
+ shouldHandleSessionKeyPAPIQueries();
+ }
}
}
} catch (final AssertionError e) {
@@ -812,57 +842,211 @@ public class IQv2StoreIntegrationTest {
}
- private <T> void shouldHandleRangeQueries(final Function<T, Integer>
extractor) {
+ private <T> void shouldHandleRangeDSLQueries(final Function<T, Integer>
extractor) {
+ shouldHandleRangeQuery(
+ Optional.of(0),
+ Optional.of(4),
+ extractor,
+ mkSet(1, 3, 5, 7, 9)
+ );
+
shouldHandleRangeQuery(
Optional.of(1),
Optional.of(3),
extractor,
- mkSet(1, 2, 3)
-
+ mkSet(3, 5, 7)
);
+
shouldHandleRangeQuery(
- Optional.of(1),
+ Optional.of(3),
Optional.empty(),
extractor,
- mkSet(1, 2, 3)
+ mkSet(7, 9)
+ );
+ shouldHandleRangeQuery(
+ Optional.empty(),
+ Optional.of(3),
+ extractor,
+ mkSet(1, 3, 5, 7)
);
+
shouldHandleRangeQuery(
Optional.empty(),
- Optional.of(1),
+ Optional.empty(),
extractor,
- mkSet(0, 1)
+ mkSet(1, 3, 5, 7, 9)
+ );
+ }
+
+ private <T> void shouldHandleRangePAPIQueries(final Function<T, Integer>
extractor) {
+ shouldHandleRangeQuery(
+ Optional.of(0),
+ Optional.of(4),
+ extractor,
+ mkSet(1, 3, 5, 7, 9)
+ );
+ shouldHandleRangeQuery(
+ Optional.of(1),
+ Optional.of(3),
+ extractor,
+ mkSet(3, 5, 7)
);
+
shouldHandleRangeQuery(
+ Optional.of(3),
Optional.empty(),
+ extractor,
+ mkSet(7, 9)
+ );
+
+ shouldHandleRangeQuery(
Optional.empty(),
+ Optional.of(3),
extractor,
- mkSet(0, 1, 2, 3)
+ mkSet(1, 3, 5, 7)
+ );
+ shouldHandleRangeQuery(
+ Optional.empty(),
+ Optional.empty(),
+ extractor,
+ mkSet(1, 3, 5, 7, 9)
);
}
- private <T> void shouldHandleWindowKeyQueries(final Function<T, Integer>
extractor) {
+ private <T> void shouldHandleWindowKeyDSLQueries(final Function<T,
Integer> extractor) {
// tightest possible start range
+ shouldHandleWindowKeyQuery(
+ 0,
+ Instant.ofEpochMilli(WINDOW_START),
+ Instant.ofEpochMilli(WINDOW_START),
+ extractor,
+ mkSet(1)
+ );
+
+ // miss the window start range
+ shouldHandleWindowKeyQuery(
+ 0,
+ Instant.ofEpochMilli(WINDOW_START - 1),
+ Instant.ofEpochMilli(WINDOW_START - 1),
+ extractor,
+ mkSet()
+ );
+
+ // do the window key query at the first window and the key of record
which we want to query is 2
shouldHandleWindowKeyQuery(
2,
Instant.ofEpochMilli(WINDOW_START),
Instant.ofEpochMilli(WINDOW_START),
extractor,
+ mkSet()
+ );
+
+ // miss the key
+ shouldHandleWindowKeyQuery(
+ 999,
+ Instant.ofEpochMilli(WINDOW_START),
+ Instant.ofEpochMilli(WINDOW_START),
+ extractor,
+ mkSet()
+ );
+
+ // miss both
+ shouldHandleWindowKeyQuery(
+ 999,
+ Instant.ofEpochMilli(WINDOW_START - 1),
+ Instant.ofEpochMilli(WINDOW_START - 1),
+ extractor,
+ mkSet()
+ );
+
+ // do the window key query at the first and the second windows and the
key of record which we want to query is 0
+ shouldHandleWindowKeyQuery(
+ 0,
+ Instant.ofEpochMilli(WINDOW_START),
+ Instant.ofEpochMilli(WINDOW_START +
Duration.ofMinutes(5).toMillis()),
+ extractor,
+ mkSet(1)
+ );
+
+ // do the window key query at the first window and the key of record
which we want to query is 1
+ shouldHandleWindowKeyQuery(
+ 1,
+ Instant.ofEpochMilli(WINDOW_START),
+ Instant.ofEpochMilli(WINDOW_START),
+ extractor,
mkSet(2)
);
- // miss the window start range
+ // do the window key query at the second and the third windows and the
key of record which we want to query is 2
shouldHandleWindowKeyQuery(
2,
+ Instant.ofEpochMilli(WINDOW_START +
Duration.ofMinutes(5).toMillis()),
+ Instant.ofEpochMilli(WINDOW_START +
Duration.ofMinutes(10).toMillis()),
+ extractor,
+ mkSet(4, 5)
+ );
+
+ // do the window key query at the second and the third windows and the
key of record which we want to query is 3
+ shouldHandleWindowKeyQuery(
+ 3,
+ Instant.ofEpochMilli(WINDOW_START +
Duration.ofMinutes(5).toMillis()),
+ Instant.ofEpochMilli(WINDOW_START +
Duration.ofMinutes(10).toMillis()),
+ extractor,
+ mkSet(13)
+ );
+
+ // do the window key query at the fourth and the fifth windows and the
key of record which we want to query is 4
+ shouldHandleWindowKeyQuery(
+ 4,
+ Instant.ofEpochMilli(WINDOW_START +
Duration.ofMinutes(15).toMillis()),
+ Instant.ofEpochMilli(WINDOW_START +
Duration.ofMinutes(20).toMillis()),
+ extractor,
+ mkSet(17)
+ );
+
+ // do the window key query at the fifth window and the key of record
which we want to query is 4
+ shouldHandleWindowKeyQuery(
+ 4,
+ Instant.ofEpochMilli(WINDOW_START +
Duration.ofMinutes(20).toMillis()),
+ Instant.ofEpochMilli(WINDOW_START +
Duration.ofMinutes(24).toMillis()),
+ extractor,
+ mkSet()
+ );
+ }
+
+ private <T> void shouldHandleWindowKeyPAPIQueries(final Function<T,
Integer> extractor) {
+
+ // tightest possible start range
+ shouldHandleWindowKeyQuery(
+ 0,
+ Instant.ofEpochMilli(WINDOW_START),
+ Instant.ofEpochMilli(WINDOW_START),
+ extractor,
+ mkSet(1)
+ );
+
+ // miss the window start range
+ shouldHandleWindowKeyQuery(
+ 0,
Instant.ofEpochMilli(WINDOW_START - 1),
Instant.ofEpochMilli(WINDOW_START - 1),
extractor,
mkSet()
);
+ // do the window key query at the first window and the key of record
which we want to query is 2
+ shouldHandleWindowKeyQuery(
+ 2,
+ Instant.ofEpochMilli(WINDOW_START),
+ Instant.ofEpochMilli(WINDOW_START),
+ extractor,
+ mkSet()
+ );
+
// miss the key
shouldHandleWindowKeyQuery(
999,
@@ -880,19 +1064,179 @@ public class IQv2StoreIntegrationTest {
extractor,
mkSet()
);
+
+ // do the window key query at the first and the second windows and the
key of record which we want to query is 0
+ shouldHandleWindowKeyQuery(
+ 0,
+ Instant.ofEpochMilli(WINDOW_START),
+ Instant.ofEpochMilli(WINDOW_START +
Duration.ofMinutes(5).toMillis()),
+ extractor,
+ mkSet(1)
+ );
+
+ // do the window key query at the first window and the key of record
which we want to query is 1
+ shouldHandleWindowKeyQuery(
+ 1,
+ Instant.ofEpochMilli(WINDOW_START),
+ Instant.ofEpochMilli(WINDOW_START),
+ extractor,
+ mkSet(2)
+ );
+
+ // do the window key query at the second and the third windows and the
key of record which we want to query is 2
+ shouldHandleWindowKeyQuery(
+ 2,
+ Instant.ofEpochMilli(WINDOW_START +
Duration.ofMinutes(5).toMillis()),
+ Instant.ofEpochMilli(WINDOW_START +
Duration.ofMinutes(10).toMillis()),
+ extractor,
+ mkSet(4, 5)
+ );
+
+ // do the window key query at the second and the third windows and the
key of record which we want to query is 3
+ shouldHandleWindowKeyQuery(
+ 3,
+ Instant.ofEpochMilli(WINDOW_START +
Duration.ofMinutes(5).toMillis()),
+ Instant.ofEpochMilli(WINDOW_START +
Duration.ofMinutes(10).toMillis()),
+ extractor,
+ mkSet(7)
+ );
+
+ // do the window key query at the fourth and the fifth windows and the
key of record which we want to query is 4
+ shouldHandleWindowKeyQuery(
+ 4,
+ Instant.ofEpochMilli(WINDOW_START +
Duration.ofMinutes(15).toMillis()),
+ Instant.ofEpochMilli(WINDOW_START +
Duration.ofMinutes(20).toMillis()),
+ extractor,
+ mkSet(9)
+ );
+
+ // do the window key query at the fifth window and the key of record
which we want to query is 4
+ shouldHandleWindowKeyQuery(
+ 4,
+ Instant.ofEpochMilli(WINDOW_START +
Duration.ofMinutes(20).toMillis()),
+ Instant.ofEpochMilli(WINDOW_START +
Duration.ofMinutes(24).toMillis()),
+ extractor,
+ mkSet()
+ );
}
- private <T> void shouldHandleWindowRangeQueries(final Function<T, Integer>
extractor) {
+ private <T> void shouldHandleWindowRangeDSLQueries(final Function<T,
Integer> extractor) {
final long windowSize = WINDOW_SIZE.toMillis();
final long windowStart = (RECORD_TIME / windowSize) * windowSize;
+ // miss the window start
+ shouldHandleWindowRangeQuery(
+ Instant.ofEpochMilli(windowStart - 1),
+ Instant.ofEpochMilli(windowStart - 1),
+ extractor,
+ mkSet()
+ );
+
+ // do the query at the first window
+ shouldHandleWindowRangeQuery(
+ Instant.ofEpochMilli(windowStart),
+ Instant.ofEpochMilli(windowStart),
+ extractor,
+ mkSet(1, 2)
+ );
+
+ // do the query at the first and the second windows
shouldHandleWindowRangeQuery(
Instant.ofEpochMilli(windowStart),
+ Instant.ofEpochMilli(windowStart +
Duration.ofMinutes(5).toMillis()),
+ extractor,
+ mkSet(1, 2, 3, 4)
+ );
+
+ // do the query at the second and the third windows
+ shouldHandleWindowRangeQuery(
+ Instant.ofEpochMilli(windowStart +
Duration.ofMinutes(5).toMillis()),
+ Instant.ofEpochMilli(windowStart +
Duration.ofMinutes(10).toMillis()),
+ extractor,
+ mkSet(3, 4, 5, 13)
+ );
+
+ // do the query at the third and the fourth windows
+ shouldHandleWindowRangeQuery(
+ Instant.ofEpochMilli(windowStart +
Duration.ofMinutes(10).toMillis()),
+ Instant.ofEpochMilli(windowStart +
Duration.ofMinutes(15).toMillis()),
+ extractor,
+ mkSet(17, 5, 13)
+ );
+
+ // do the query at the fourth and the fifth windows
+ shouldHandleWindowRangeQuery(
+ Instant.ofEpochMilli(windowStart +
Duration.ofMinutes(15).toMillis()),
+ Instant.ofEpochMilli(windowStart +
Duration.ofMinutes(20).toMillis()),
+ extractor,
+ mkSet(17)
+ );
+
+ //do the query at the fifth and the sixth windows
+ shouldHandleWindowRangeQuery(
+ Instant.ofEpochMilli(windowStart +
Duration.ofMinutes(20).toMillis()),
+ Instant.ofEpochMilli(windowStart +
Duration.ofMinutes(25).toMillis()),
+ extractor,
+ mkSet()
+ );
+
+ // do the query from the second to the fourth windows
+ shouldHandleWindowRangeQuery(
+ Instant.ofEpochMilli(windowStart +
Duration.ofMinutes(5).toMillis()),
+ Instant.ofEpochMilli(windowStart +
Duration.ofMinutes(15).toMillis()),
+ extractor,
+ mkSet(17, 3, 4, 5, 13)
+ );
+
+ // do the query from the first to the fourth windows
+ shouldHandleWindowRangeQuery(
Instant.ofEpochMilli(windowStart),
+ Instant.ofEpochMilli(windowStart +
Duration.ofMinutes(15).toMillis()),
extractor,
- mkSet(0, 1, 2, 3)
+ mkSet(1, 17, 2, 3, 4, 5, 13)
);
+ // Should fail to execute this query on a WindowStore.
+ final WindowRangeQuery<Integer, T> query = WindowRangeQuery.withKey(2);
+
+ final StateQueryRequest<KeyValueIterator<Windowed<Integer>, T>>
request =
+ inStore(STORE_NAME)
+ .withQuery(query)
+ .withPartitions(mkSet(0, 1))
+ .withPositionBound(PositionBound.at(INPUT_POSITION));
+
+ final StateQueryResult<KeyValueIterator<Windowed<Integer>, T>> result =
+ IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+
+ if (result.getGlobalResult() != null) {
+ fail("global tables aren't implemented");
+ } else {
+ final Map<Integer, QueryResult<KeyValueIterator<Windowed<Integer>,
T>>> queryResult =
+ result.getPartitionResults();
+ for (final int partition : queryResult.keySet()) {
+ final QueryResult<KeyValueIterator<Windowed<Integer>, T>>
partitionResult =
+ queryResult.get(partition);
+ final boolean failure = partitionResult.isFailure();
+ if (!failure) {
+ throw new AssertionError(queryResult.toString());
+ }
+ assertThat(partitionResult.getFailureReason(),
is(FailureReason.UNKNOWN_QUERY_TYPE));
+ assertThat(partitionResult.getFailureMessage(), matchesPattern(
+ "This store"
+ + " \\(class
org.apache.kafka.streams.state.internals.Metered.*WindowStore\\)"
+ + " doesn't know how to execute the given query"
+ + " \\(WindowRangeQuery\\{key=Optional\\[2],
timeFrom=Optional.empty, timeTo=Optional.empty}\\)"
+ + " because WindowStores only supports
WindowRangeQuery.withWindowStartRange\\."
+ + " Contact the store maintainer if you need support
for a new query type\\."
+ ));
+ }
+ }
+ }
+
+ private <T> void shouldHandleWindowRangePAPIQueries(final Function<T,
Integer> extractor) {
+ final long windowSize = WINDOW_SIZE.toMillis();
+ final long windowStart = (RECORD_TIME / windowSize) * windowSize;
+
// miss the window start
shouldHandleWindowRangeQuery(
Instant.ofEpochMilli(windowStart - 1),
@@ -901,17 +1245,81 @@ public class IQv2StoreIntegrationTest {
mkSet()
);
+ // do the query at the first window
+ shouldHandleWindowRangeQuery(
+ Instant.ofEpochMilli(windowStart),
+ Instant.ofEpochMilli(windowStart),
+ extractor,
+ mkSet(1, 2)
+ );
+
+ // do the query at the first and the second windows
+ shouldHandleWindowRangeQuery(
+ Instant.ofEpochMilli(windowStart),
+ Instant.ofEpochMilli(windowStart +
Duration.ofMinutes(5).toMillis()),
+ extractor,
+ mkSet(1, 2, 3, 4)
+ );
+
+ // do the query at the second and the third windows
+ shouldHandleWindowRangeQuery(
+ Instant.ofEpochMilli(windowStart +
Duration.ofMinutes(5).toMillis()),
+ Instant.ofEpochMilli(windowStart +
Duration.ofMinutes(10).toMillis()),
+ extractor,
+ mkSet(3, 4, 5, 7)
+ );
+
+ // do the query at the third and the fourth windows
+ shouldHandleWindowRangeQuery(
+ Instant.ofEpochMilli(windowStart +
Duration.ofMinutes(10).toMillis()),
+ Instant.ofEpochMilli(windowStart +
Duration.ofMinutes(15).toMillis()),
+ extractor,
+ mkSet(5, 7, 9)
+ );
+
+ // do the query at the fourth and the fifth windows
+ shouldHandleWindowRangeQuery(
+ Instant.ofEpochMilli(windowStart +
Duration.ofMinutes(15).toMillis()),
+ Instant.ofEpochMilli(windowStart +
Duration.ofMinutes(20).toMillis()),
+ extractor,
+ mkSet(9)
+ );
+
+ //do the query at the fifth and the sixth windows
+ shouldHandleWindowRangeQuery(
+ Instant.ofEpochMilli(windowStart +
Duration.ofMinutes(20).toMillis()),
+ Instant.ofEpochMilli(windowStart +
Duration.ofMinutes(25).toMillis()),
+ extractor,
+ mkSet()
+ );
+
+ // do the query from the second to the fourth windows
+ shouldHandleWindowRangeQuery(
+ Instant.ofEpochMilli(windowStart +
Duration.ofMinutes(5).toMillis()),
+ Instant.ofEpochMilli(windowStart +
Duration.ofMinutes(15).toMillis()),
+ extractor,
+ mkSet(3, 4, 5, 7, 9)
+ );
+
+ // do the query from the first to the fourth windows
+ shouldHandleWindowRangeQuery(
+ Instant.ofEpochMilli(windowStart),
+ Instant.ofEpochMilli(windowStart +
Duration.ofMinutes(15).toMillis()),
+ extractor,
+ mkSet(1, 2, 3, 4, 5, 7, 9)
+ );
+
// Should fail to execute this query on a WindowStore.
final WindowRangeQuery<Integer, T> query = WindowRangeQuery.withKey(2);
final StateQueryRequest<KeyValueIterator<Windowed<Integer>, T>>
request =
- inStore(STORE_NAME)
- .withQuery(query)
- .withPartitions(mkSet(0, 1))
- .withPositionBound(PositionBound.at(INPUT_POSITION));
+ inStore(STORE_NAME)
+ .withQuery(query)
+ .withPartitions(mkSet(0, 1))
+ .withPositionBound(PositionBound.at(INPUT_POSITION));
final StateQueryResult<KeyValueIterator<Windowed<Integer>, T>> result =
- IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+ IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
if (result.getGlobalResult() != null) {
fail("global tables aren't implemented");
@@ -938,10 +1346,103 @@ public class IQv2StoreIntegrationTest {
}
}
- private <T> void shouldHandleSessionKeyQueries() {
+ private <T> void shouldHandleSessionKeyDSLQueries() {
+ shouldHandleSessionRangeQuery(
+ 0,
+ mkSet(1)
+ );
+
+ shouldHandleSessionRangeQuery(
+ 1,
+ mkSet(5)
+ );
+
shouldHandleSessionRangeQuery(
2,
- mkSet(2)
+ mkSet(9)
+ );
+
+ shouldHandleSessionRangeQuery(
+ 3,
+ mkSet(13)
+ );
+
+ shouldHandleSessionRangeQuery(
+ 4,
+ mkSet(17)
+ );
+
+ // not preset, so empty result iter
+ shouldHandleSessionRangeQuery(
+ 999,
+ mkSet()
+ );
+
+ // Should fail to execute this query on a SessionStore.
+ final WindowRangeQuery<Integer, T> query =
+ WindowRangeQuery.withWindowStartRange(
+ Instant.ofEpochMilli(0L),
+ Instant.ofEpochMilli(0L)
+ );
+
+ final StateQueryRequest<KeyValueIterator<Windowed<Integer>, T>>
request =
+ inStore(STORE_NAME)
+ .withQuery(query)
+ .withPartitions(mkSet(0, 1))
+ .withPositionBound(PositionBound.at(INPUT_POSITION));
+
+ final StateQueryResult<KeyValueIterator<Windowed<Integer>, T>> result =
+ IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+
+ if (result.getGlobalResult() != null) {
+ fail("global tables aren't implemented");
+ } else {
+ final Map<Integer, QueryResult<KeyValueIterator<Windowed<Integer>,
T>>> queryResult =
+ result.getPartitionResults();
+ for (final int partition : queryResult.keySet()) {
+ final QueryResult<KeyValueIterator<Windowed<Integer>, T>>
partitionResult =
+ queryResult.get(partition);
+ final boolean failure = partitionResult.isFailure();
+ if (!failure) {
+ throw new AssertionError(queryResult.toString());
+ }
+ assertThat(partitionResult.getFailureReason(),
is(FailureReason.UNKNOWN_QUERY_TYPE));
+ assertThat(partitionResult.getFailureMessage(), is(
+ "This store"
+ + " (class
org.apache.kafka.streams.state.internals.MeteredSessionStore)"
+ + " doesn't know how to execute the given query"
+ + " (WindowRangeQuery{key=Optional.empty,
timeFrom=Optional[1970-01-01T00:00:00Z],
timeTo=Optional[1970-01-01T00:00:00Z]})"
+ + " because SessionStores only support
WindowRangeQuery.withKey."
+ + " Contact the store maintainer if you need support
for a new query type."
+ ));
+ }
+ }
+ }
+
+ private <T> void shouldHandleSessionKeyPAPIQueries() {
+ shouldHandleSessionRangeQuery(
+ 0,
+ mkSet(0, 1)
+ );
+
+ shouldHandleSessionRangeQuery(
+ 1,
+ mkSet(2, 3)
+ );
+
+ shouldHandleSessionRangeQuery(
+ 2,
+ mkSet(4, 5)
+ );
+
+ shouldHandleSessionRangeQuery(
+ 3,
+ mkSet(6, 7)
+ );
+
+ shouldHandleSessionRangeQuery(
+ 4,
+ mkSet(8, 9)
);
// not preset, so empty result iter
@@ -1008,7 +1509,7 @@ public class IQv2StoreIntegrationTest {
assertThat(
result.getGlobalResult().getFailureMessage(),
is("Global stores do not yet support the KafkaStreams#query API."
- + " Use KafkaStreams#store instead.")
+ + " Use KafkaStreams#store instead.")
);
}
@@ -1048,19 +1549,19 @@ public class IQv2StoreIntegrationTest {
}
public <V> void shouldHandleKeyQuery(
- final Integer key,
- final Function<V, Integer> valueExtactor,
- final Integer expectedValue) {
+ final Integer key,
+ final Function<V, Integer> valueExtactor,
+ final Integer expectedValue) {
final KeyQuery<Integer, V> query = KeyQuery.withKey(key);
final StateQueryRequest<V> request =
- inStore(STORE_NAME)
- .withQuery(query)
- .withPartitions(mkSet(0, 1))
- .withPositionBound(PositionBound.at(INPUT_POSITION));
+ inStore(STORE_NAME)
+ .withQuery(query)
+ .withPartitions(mkSet(0, 1))
+ .withPositionBound(PositionBound.at(INPUT_POSITION));
final StateQueryResult<V> result =
- IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+ IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
final QueryResult<V> queryResult = result.getOnlyPartitionResult();
final boolean failure = queryResult.isFailure();
@@ -1071,8 +1572,8 @@ public class IQv2StoreIntegrationTest {
assertThrows(IllegalArgumentException.class,
queryResult::getFailureReason);
assertThrows(
- IllegalArgumentException.class,
- queryResult::getFailureMessage
+ IllegalArgumentException.class,
+ queryResult::getFailureMessage
);
final V result1 = queryResult.getResult();
@@ -1080,7 +1581,7 @@ public class IQv2StoreIntegrationTest {
assertThat(integer, is(expectedValue));
assertThat(queryResult.getExecutionInfo(), is(empty()));
assertThat(queryResult.getPosition(), is(POSITION_0));
- }
+ }
public <V> void shouldHandleRangeQuery(
final Optional<Integer> lower,
@@ -1189,21 +1690,21 @@ public class IQv2StoreIntegrationTest {
}
public <V> void shouldHandleWindowRangeQuery(
- final Instant timeFrom,
- final Instant timeTo,
- final Function<V, Integer> valueExtactor,
- final Set<Integer> expectedValue) {
+ final Instant timeFrom,
+ final Instant timeTo,
+ final Function<V, Integer> valueExtactor,
+ final Set<Integer> expectedValue) {
final WindowRangeQuery<Integer, V> query =
WindowRangeQuery.withWindowStartRange(timeFrom, timeTo);
final StateQueryRequest<KeyValueIterator<Windowed<Integer>, V>>
request =
- inStore(STORE_NAME)
- .withQuery(query)
- .withPartitions(mkSet(0, 1))
- .withPositionBound(PositionBound.at(INPUT_POSITION));
+ inStore(STORE_NAME)
+ .withQuery(query)
+ .withPartitions(mkSet(0, 1))
+ .withPositionBound(PositionBound.at(INPUT_POSITION));
final StateQueryResult<KeyValueIterator<Windowed<Integer>, V>> result =
- IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+ IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
if (result.getGlobalResult() != null) {
fail("global tables aren't implemented");
@@ -1218,12 +1719,12 @@ public class IQv2StoreIntegrationTest {
assertThat(queryResult.get(partition).isSuccess(), is(true));
assertThrows(
- IllegalArgumentException.class,
- queryResult.get(partition)::getFailureReason
+ IllegalArgumentException.class,
+ queryResult.get(partition)::getFailureReason
);
assertThrows(
- IllegalArgumentException.class,
- queryResult.get(partition)::getFailureMessage
+ IllegalArgumentException.class,
+ queryResult.get(partition)::getFailureMessage
);
try (final KeyValueIterator<Windowed<Integer>, V> iterator =
queryResult.get(partition).getResult()) {
@@ -1239,18 +1740,18 @@ public class IQv2StoreIntegrationTest {
}
public <V> void shouldHandleSessionRangeQuery(
- final Integer key,
- final Set<Integer> expectedValue) {
+ final Integer key,
+ final Set<Integer> expectedValue) {
final WindowRangeQuery<Integer, V> query =
WindowRangeQuery.withKey(key);
final StateQueryRequest<KeyValueIterator<Windowed<Integer>, V>>
request =
- inStore(STORE_NAME)
- .withQuery(query)
- .withPartitions(mkSet(0, 1))
- .withPositionBound(PositionBound.at(INPUT_POSITION));
+ inStore(STORE_NAME)
+ .withQuery(query)
+ .withPartitions(mkSet(0, 1))
+ .withPositionBound(PositionBound.at(INPUT_POSITION));
final StateQueryResult<KeyValueIterator<Windowed<Integer>, V>> result =
- IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+ IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
if (result.getGlobalResult() != null) {
fail("global tables aren't implemented");
@@ -1265,12 +1766,12 @@ public class IQv2StoreIntegrationTest {
assertThat(queryResult.get(partition).isSuccess(), is(true));
assertThrows(
- IllegalArgumentException.class,
- queryResult.get(partition)::getFailureReason
+ IllegalArgumentException.class,
+ queryResult.get(partition)::getFailureReason
);
assertThrows(
- IllegalArgumentException.class,
- queryResult.get(partition)::getFailureMessage
+ IllegalArgumentException.class,
+ queryResult.get(partition)::getFailureMessage
);
try (final KeyValueIterator<Windowed<Integer>, V> iterator =
queryResult.get(partition).getResult()) {