This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7fd6a9b3e22 Kafka 12960: Follow up Commit to filter expired records
from Windowed/Session Stores (#12756)
7fd6a9b3e22 is described below
commit 7fd6a9b3e22eabfe31d0f3e7792d03f09cf8aabc
Author: vamossagar12 <[email protected]>
AuthorDate: Mon Nov 7 09:23:34 2022 +0530
Kafka 12960: Follow up Commit to filter expired records from
Windowed/Session Stores (#12756)
KAFKA-12960: Enforcing strict retention time for WindowStore and
SessionStore
Reviewers: Luke Chen <[email protected]>, Vicky Papavasileiou
---
.../streams/kstream/internals/KStreamImplJoin.java | 3 +-
.../kstream/internals/KStreamKStreamSelfJoin.java | 11 +-
...stractDualSchemaRocksDBSegmentedBytesStore.java | 47 ++++-
.../AbstractRocksDBSegmentedBytesStore.java | 75 ++++++--
...tractRocksDBTimeOrderedSegmentedBytesStore.java | 70 +++++---
.../internals/RocksDBSegmentedBytesStore.java | 2 +-
.../RocksDBTimestampedSegmentedBytesStore.java | 2 +-
.../TimeWindowedKStreamIntegrationTest.java | 66 ++++---
.../KStreamSlidingWindowAggregateTest.java | 18 +-
.../internals/KStreamWindowAggregateTest.java | 61 ++++---
.../internals/SessionWindowedKStreamImplTest.java | 63 +++++--
.../internals/TimeWindowedKStreamImplTest.java | 121 +++++++++----
...ctDualSchemaRocksDBSegmentedBytesStoreTest.java | 190 +++++++++++++--------
.../AbstractRocksDBSegmentedBytesStoreTest.java | 163 +++++++++---------
.../internals/AbstractSessionBytesStoreTest.java | 49 +++++-
.../internals/AbstractWindowBytesStoreTest.java | 4 +-
.../CachingPersistentWindowStoreTest.java | 3 +-
.../state/internals/MeteredSessionStoreTest.java | 49 ++++++
.../state/internals/MeteredWindowStoreTest.java | 14 ++
.../state/internals/RocksDBWindowStoreTest.java | 143 ++++++++++++----
20 files changed, 799 insertions(+), 355 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
index 32a2a4d436f..b2405db7317 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
@@ -186,7 +186,8 @@ class KStreamImplJoin {
thisWindowStore.name(),
internalWindows,
joiner,
- sharedTimeTracker
+ sharedTimeTracker,
+ windows.size() + windows.gracePeriodMs()
);
final PassThrough<K, VOut> joinMerge = new PassThrough<>();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java
index f3ce64dafeb..a6af2a4e082 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java
@@ -41,6 +41,7 @@ class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements
ProcessorSupplier<K, V1
private final long joinThisAfterMs;
private final long joinOtherBeforeMs;
private final long joinOtherAfterMs;
+ private final long retentionPeriod;
private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ?
extends VOut> joinerThis;
private final TimeTracker sharedTimeTracker;
@@ -49,7 +50,8 @@ class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements
ProcessorSupplier<K, V1
final String windowName,
final JoinWindowsInternal windows,
final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends
VOut> joinerThis,
- final TimeTracker sharedTimeTracker) {
+ final TimeTracker sharedTimeTracker,
+ final long retentionPeriod) {
this.windowName = windowName;
this.joinThisBeforeMs = windows.beforeMs;
@@ -58,6 +60,7 @@ class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements
ProcessorSupplier<K, V1
this.joinOtherAfterMs = windows.beforeMs;
this.joinerThis = joinerThis;
this.sharedTimeTracker = sharedTimeTracker;
+ this.retentionPeriod = retentionPeriod;
}
@Override
@@ -93,6 +96,8 @@ class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements
ProcessorSupplier<K, V1
.withValue(joinerThis.apply(record.key(), record.value(), (V2)
record.value()))
.withTimestamp(inputRecordTimestamp);
sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
+ // We emit the self record only if it isn't expired.
+ final boolean emitSelfRecord = inputRecordTimestamp >
sharedTimeTracker.streamTime - retentionPeriod + 1;
// Join current record with other
try (final WindowStoreIterator<V2> iter =
windowStore.fetch(record.key(), timeFrom, timeTo)) {
@@ -120,7 +125,7 @@ class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements
ProcessorSupplier<K, V1
// This is needed so that output records follow timestamp
order
// Join this with self
- if (inputRecordTimestamp < maxRecordTimestamp &&
!emittedJoinWithSelf) {
+ if (inputRecordTimestamp < maxRecordTimestamp &&
!emittedJoinWithSelf && emitSelfRecord) {
emittedJoinWithSelf = true;
context().forward(selfRecord);
}
@@ -134,7 +139,7 @@ class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements
ProcessorSupplier<K, V1
}
// Join this with self
- if (!emittedJoinWithSelf) {
+ if (!emittedJoinWithSelf && emitSelfRecord) {
context().forward(selfRecord);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
index 95c1d8d8c81..b446a52eb5f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
@@ -52,6 +52,7 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
protected final AbstractSegments<S> segments;
protected final KeySchema baseKeySchema;
protected final Optional<KeySchema> indexKeySchema;
+ private final long retentionPeriod;
protected ProcessorContext context;
@@ -66,22 +67,27 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
AbstractDualSchemaRocksDBSegmentedBytesStore(final String name,
final KeySchema baseKeySchema,
final Optional<KeySchema>
indexKeySchema,
- final AbstractSegments<S>
segments) {
+ final AbstractSegments<S>
segments,
+ final long retentionPeriod) {
this.name = name;
this.baseKeySchema = baseKeySchema;
this.indexKeySchema = indexKeySchema;
this.segments = segments;
+ this.retentionPeriod = retentionPeriod;
}
@Override
public KeyValueIterator<Bytes, byte[]> all() {
+
+ final long actualFrom = getActualFrom(0, baseKeySchema instanceof
PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);
+
final List<S> searchSpace = segments.allSegments(true);
- final Bytes from = baseKeySchema.lowerRange(null, 0);
+ final Bytes from = baseKeySchema.lowerRange(null, actualFrom);
final Bytes to = baseKeySchema.upperRange(null, Long.MAX_VALUE);
return new SegmentIterator<>(
searchSpace.iterator(),
- baseKeySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE,
true),
+ baseKeySchema.hasNextCondition(null, null, actualFrom,
Long.MAX_VALUE, true),
from,
to,
true);
@@ -89,13 +95,16 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
@Override
public KeyValueIterator<Bytes, byte[]> backwardAll() {
+
+ final long actualFrom = getActualFrom(0, baseKeySchema instanceof
PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);
+
final List<S> searchSpace = segments.allSegments(false);
- final Bytes from = baseKeySchema.lowerRange(null, 0);
+ final Bytes from = baseKeySchema.lowerRange(null, actualFrom);
final Bytes to = baseKeySchema.upperRange(null, Long.MAX_VALUE);
return new SegmentIterator<>(
searchSpace.iterator(),
- baseKeySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE,
false),
+ baseKeySchema.hasNextCondition(null, null, actualFrom,
Long.MAX_VALUE, false),
from,
to,
false);
@@ -119,6 +128,15 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
abstract protected KeyValue<Bytes, byte[]> getIndexKeyValue(final Bytes
baseKey, final byte[] baseValue);
+ // isTimeFirstWindowSchema true implies ON_WINDOW_CLOSE semantics. There's
an edge case
+ // when retentionPeriod = grace Period. If we add 1, then actualFrom > to
which would
+ // lead to no records being returned.
+ protected long getActualFrom(final long from, final boolean
isTimeFirstWindowSchema) {
+ return isTimeFirstWindowSchema ? Math.max(from, observedStreamTime -
retentionPeriod) :
+ Math.max(from, observedStreamTime - retentionPeriod + 1);
+
+ }
+
// For testing
void putIndex(final Bytes indexKey, final byte[] value) {
if (!hasIndex()) {
@@ -191,7 +209,24 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
@Override
public byte[] get(final Bytes rawKey) {
- final S segment =
segments.getSegmentForTimestamp(baseKeySchema.segmentTimestamp(rawKey));
+ final long timestampFromRawKey =
baseKeySchema.segmentTimestamp(rawKey);
+ // check if timestamp is expired
+
+ if (baseKeySchema instanceof
PrefixedWindowKeySchemas.TimeFirstWindowKeySchema) {
+ if (timestampFromRawKey < observedStreamTime - retentionPeriod) {
+ LOG.debug("Record with key {} is expired as timestamp from key
({}) < actual stream time ({})",
+ rawKey.toString(), timestampFromRawKey,
observedStreamTime - retentionPeriod);
+ return null;
+ }
+ } else {
+ if (timestampFromRawKey < observedStreamTime - retentionPeriod +
1) {
+ LOG.debug("Record with key {} is expired as timestamp from key
({}) < actual stream time ({})",
+ rawKey.toString(), timestampFromRawKey,
observedStreamTime - retentionPeriod + 1);
+ return null;
+ }
+ }
+
+ final S segment = segments.getSegmentForTimestamp(timestampFromRawKey);
if (segment == null) {
return null;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index 13f914d075a..bcfe30b30e9 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -52,6 +52,7 @@ public class AbstractRocksDBSegmentedBytesStore<S extends
Segment> implements Se
private final String name;
private final AbstractSegments<S> segments;
private final String metricScope;
+ private final long retentionPeriod;
private final KeySchema keySchema;
private ProcessorContext context;
@@ -65,10 +66,12 @@ public class AbstractRocksDBSegmentedBytesStore<S extends
Segment> implements Se
AbstractRocksDBSegmentedBytesStore(final String name,
final String metricScope,
+ final long retentionPeriod,
final KeySchema keySchema,
final AbstractSegments<S> segments) {
this.name = name;
this.metricScope = metricScope;
+ this.retentionPeriod = retentionPeriod;
this.keySchema = keySchema;
this.segments = segments;
}
@@ -91,19 +94,30 @@ public class AbstractRocksDBSegmentedBytesStore<S extends
Segment> implements Se
final long from,
final long to,
final boolean forward) {
- final List<S> searchSpace = keySchema.segmentsToSearch(segments, from,
to, forward);
+ final long actualFrom = getActualFrom(from);
- final Bytes binaryFrom = keySchema.lowerRangeFixedSize(key, from);
+ if (keySchema instanceof WindowKeySchema && to < actualFrom) {
+ LOG.debug("Returning no records for key {} as to ({}) < actualFrom
({}) ", key.toString(), to, actualFrom);
+ return KeyValueIterators.emptyIterator();
+ }
+
+ final List<S> searchSpace = keySchema.segmentsToSearch(segments,
actualFrom, to, forward);
+
+ final Bytes binaryFrom = keySchema.lowerRangeFixedSize(key,
actualFrom);
final Bytes binaryTo = keySchema.upperRangeFixedSize(key, to);
return new SegmentIterator<>(
searchSpace.iterator(),
- keySchema.hasNextCondition(key, key, from, to, forward),
+ keySchema.hasNextCondition(key, key, actualFrom, to, forward),
binaryFrom,
binaryTo,
forward);
}
+ private long getActualFrom(final long from) {
+ return Math.max(from, observedStreamTime - retentionPeriod + 1);
+ }
+
@Override
public KeyValueIterator<Bytes, byte[]> fetch(final Bytes keyFrom,
final Bytes keyTo,
@@ -133,14 +147,21 @@ public class AbstractRocksDBSegmentedBytesStore<S extends
Segment> implements Se
return KeyValueIterators.emptyIterator();
}
- final List<S> searchSpace = keySchema.segmentsToSearch(segments, from,
to, forward);
+ final long actualFrom = getActualFrom(from);
- final Bytes binaryFrom = keyFrom == null ? null :
keySchema.lowerRange(keyFrom, from);
+ if (keySchema instanceof WindowKeySchema && to < actualFrom) {
+ LOG.debug("Returning no records for keys {}/{} as to ({}) <
actualFrom ({}) ", keyFrom, keyTo, to, actualFrom);
+ return KeyValueIterators.emptyIterator();
+ }
+
+ final List<S> searchSpace = keySchema.segmentsToSearch(segments,
actualFrom, to, forward);
+
+ final Bytes binaryFrom = keyFrom == null ? null :
keySchema.lowerRange(keyFrom, actualFrom);
final Bytes binaryTo = keyTo == null ? null :
keySchema.upperRange(keyTo, to);
return new SegmentIterator<>(
searchSpace.iterator(),
- keySchema.hasNextCondition(keyFrom, keyTo, from, to, forward),
+ keySchema.hasNextCondition(keyFrom, keyTo, actualFrom, to,
forward),
binaryFrom,
binaryTo,
forward);
@@ -148,11 +169,12 @@ public class AbstractRocksDBSegmentedBytesStore<S extends
Segment> implements Se
@Override
public KeyValueIterator<Bytes, byte[]> all() {
- final List<S> searchSpace = segments.allSegments(true);
+ final long actualFrom = getActualFrom(0);
+ final List<S> searchSpace = keySchema.segmentsToSearch(segments,
actualFrom, Long.MAX_VALUE, true);
return new SegmentIterator<>(
searchSpace.iterator(),
- keySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE,
true),
+ keySchema.hasNextCondition(null, null, actualFrom,
Long.MAX_VALUE, true),
null,
null,
true);
@@ -160,11 +182,13 @@ public class AbstractRocksDBSegmentedBytesStore<S extends
Segment> implements Se
@Override
public KeyValueIterator<Bytes, byte[]> backwardAll() {
- final List<S> searchSpace = segments.allSegments(false);
+ final long actualFrom = getActualFrom(0);
+
+ final List<S> searchSpace = keySchema.segmentsToSearch(segments,
actualFrom, Long.MAX_VALUE, false);
return new SegmentIterator<>(
searchSpace.iterator(),
- keySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE,
false),
+ keySchema.hasNextCondition(null, null, actualFrom,
Long.MAX_VALUE, false),
null,
null,
false);
@@ -173,11 +197,18 @@ public class AbstractRocksDBSegmentedBytesStore<S extends
Segment> implements Se
@Override
public KeyValueIterator<Bytes, byte[]> fetchAll(final long timeFrom,
final long timeTo) {
- final List<S> searchSpace = segments.segments(timeFrom, timeTo, true);
+ final long actualFrom = getActualFrom(timeFrom);
+
+ if (keySchema instanceof WindowKeySchema && timeTo < actualFrom) {
+ LOG.debug("Returning no records for as timeTo ({}) < actualFrom
({}) ", timeTo, actualFrom);
+ return KeyValueIterators.emptyIterator();
+ }
+
+ final List<S> searchSpace = segments.segments(actualFrom, timeTo,
true);
return new SegmentIterator<>(
searchSpace.iterator(),
- keySchema.hasNextCondition(null, null, timeFrom, timeTo, true),
+ keySchema.hasNextCondition(null, null, actualFrom, timeTo,
true),
null,
null,
true);
@@ -186,11 +217,18 @@ public class AbstractRocksDBSegmentedBytesStore<S extends
Segment> implements Se
@Override
public KeyValueIterator<Bytes, byte[]> backwardFetchAll(final long
timeFrom,
final long timeTo)
{
- final List<S> searchSpace = segments.segments(timeFrom, timeTo, false);
+ final long actualFrom = getActualFrom(timeFrom);
+
+ if (keySchema instanceof WindowKeySchema && timeTo < actualFrom) {
+ LOG.debug("Returning no records for as timeTo ({}) < actualFrom
({}) ", timeTo, actualFrom);
+ return KeyValueIterators.emptyIterator();
+ }
+
+ final List<S> searchSpace = segments.segments(actualFrom, timeTo,
false);
return new SegmentIterator<>(
searchSpace.iterator(),
- keySchema.hasNextCondition(null, null, timeFrom, timeTo,
false),
+ keySchema.hasNextCondition(null, null, actualFrom, timeTo,
false),
null,
null,
false);
@@ -234,7 +272,14 @@ public class AbstractRocksDBSegmentedBytesStore<S extends
Segment> implements Se
@Override
public byte[] get(final Bytes key) {
- final S segment =
segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key));
+ final long timestampFromKey = keySchema.segmentTimestamp(key);
+ // check if timestamp is expired
+ if (timestampFromKey < observedStreamTime - retentionPeriod + 1) {
+ LOG.debug("Record with key {} is expired as timestamp from key
({}) < actual stream time ({})",
+ key.toString(), timestampFromKey, observedStreamTime -
retentionPeriod + 1);
+ return null;
+ }
+ final S segment = segments.getSegmentForTimestamp(timestampFromKey);
if (segment == null) {
return null;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
index 0398f0ca060..f8217c6d066 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
@@ -87,7 +87,7 @@ public abstract class
AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
cachedValue = get(baseKey);
if (cachedValue == null) {
- // Key not in base store, inconsistency happened and
remove from index.
+ // Key not in base store or key is expired, inconsistency
happened and remove from index.
indexIterator.next();
AbstractRocksDBTimeOrderedSegmentedBytesStore.this.removeIndex(key);
} else {
@@ -118,7 +118,7 @@ public abstract class
AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
final KeySchema
baseKeySchema,
final Optional<KeySchema>
indexKeySchema) {
super(name, baseKeySchema, indexKeySchema,
- new KeyValueSegments(name, metricsScope, retention,
segmentInterval));
+ new KeyValueSegments(name, metricsScope, retention,
segmentInterval), retention);
}
@Override
@@ -141,28 +141,38 @@ public abstract class
AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
final long from,
final long to,
final boolean forward) {
+
+ final long actualFrom = getActualFrom(from, baseKeySchema instanceof
PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);
+
+ if (baseKeySchema instanceof
PrefixedWindowKeySchemas.TimeFirstWindowKeySchema && to < actualFrom) {
+ return KeyValueIterators.emptyIterator();
+ }
+
if (indexKeySchema.isPresent()) {
- final List<KeyValueSegment> searchSpace =
indexKeySchema.get().segmentsToSearch(segments, from, to, forward);
+ final List<KeyValueSegment> searchSpace =
indexKeySchema.get().segmentsToSearch(segments, actualFrom, to,
+ forward);
- final Bytes binaryFrom =
indexKeySchema.get().lowerRangeFixedSize(key, from);
+ final Bytes binaryFrom =
indexKeySchema.get().lowerRangeFixedSize(key, actualFrom);
final Bytes binaryTo =
indexKeySchema.get().upperRangeFixedSize(key, to);
return getIndexToBaseStoreIterator(new SegmentIterator<>(
searchSpace.iterator(),
- indexKeySchema.get().hasNextCondition(key, key, from, to,
forward),
+ indexKeySchema.get().hasNextCondition(key, key, actualFrom,
to, forward),
binaryFrom,
binaryTo,
forward));
}
- final List<KeyValueSegment> searchSpace =
baseKeySchema.segmentsToSearch(segments, from, to, forward);
- final Bytes binaryFrom = baseKeySchema.lowerRangeFixedSize(key, from);
+ final List<KeyValueSegment> searchSpace =
baseKeySchema.segmentsToSearch(segments, actualFrom, to,
+ forward);
+
+ final Bytes binaryFrom = baseKeySchema.lowerRangeFixedSize(key,
actualFrom);
final Bytes binaryTo = baseKeySchema.upperRangeFixedSize(key, to);
return new SegmentIterator<>(
searchSpace.iterator(),
- baseKeySchema.hasNextCondition(key, key, from, to, forward),
+ baseKeySchema.hasNextCondition(key, key, actualFrom, to, forward),
binaryFrom,
binaryTo,
forward);
@@ -197,30 +207,36 @@ public abstract class
AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
return KeyValueIterators.emptyIterator();
}
+ final long actualFrom = getActualFrom(from, baseKeySchema instanceof
PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);
+
+ if (baseKeySchema instanceof
PrefixedWindowKeySchemas.TimeFirstWindowKeySchema && to < actualFrom) {
+ return KeyValueIterators.emptyIterator();
+ }
+
if (indexKeySchema.isPresent()) {
- final List<KeyValueSegment> searchSpace =
indexKeySchema.get().segmentsToSearch(segments, from, to,
+ final List<KeyValueSegment> searchSpace =
indexKeySchema.get().segmentsToSearch(segments, actualFrom, to,
forward);
- final Bytes binaryFrom = indexKeySchema.get().lowerRange(keyFrom,
from);
+ final Bytes binaryFrom = indexKeySchema.get().lowerRange(keyFrom,
actualFrom);
final Bytes binaryTo = indexKeySchema.get().upperRange(keyTo, to);
return getIndexToBaseStoreIterator(new SegmentIterator<>(
searchSpace.iterator(),
- indexKeySchema.get().hasNextCondition(keyFrom, keyTo, from,
to, forward),
+ indexKeySchema.get().hasNextCondition(keyFrom, keyTo,
actualFrom, to, forward),
binaryFrom,
binaryTo,
forward));
}
- final List<KeyValueSegment> searchSpace =
baseKeySchema.segmentsToSearch(segments, from, to,
+ final List<KeyValueSegment> searchSpace =
baseKeySchema.segmentsToSearch(segments, actualFrom, to,
forward);
- final Bytes binaryFrom = baseKeySchema.lowerRange(keyFrom, from);
+ final Bytes binaryFrom = baseKeySchema.lowerRange(keyFrom, actualFrom);
final Bytes binaryTo = baseKeySchema.upperRange(keyTo, to);
return new SegmentIterator<>(
searchSpace.iterator(),
- baseKeySchema.hasNextCondition(keyFrom, keyTo, from, to, forward),
+ baseKeySchema.hasNextCondition(keyFrom, keyTo, actualFrom, to,
forward),
binaryFrom,
binaryTo,
forward);
@@ -235,13 +251,20 @@ public abstract class
AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
@Override
public KeyValueIterator<Bytes, byte[]> fetchAll(final long timeFrom,
final long timeTo) {
- final List<KeyValueSegment> searchSpace = segments.segments(timeFrom,
timeTo, true);
- final Bytes binaryFrom = baseKeySchema.lowerRange(null, timeFrom);
+
+ final long actualFrom = getActualFrom(timeFrom, baseKeySchema
instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);
+
+ if (baseKeySchema instanceof
PrefixedWindowKeySchemas.TimeFirstWindowKeySchema && timeTo < actualFrom) {
+ return KeyValueIterators.emptyIterator();
+ }
+
+ final List<KeyValueSegment> searchSpace =
segments.segments(actualFrom, timeTo, true);
+ final Bytes binaryFrom = baseKeySchema.lowerRange(null, actualFrom);
final Bytes binaryTo = baseKeySchema.upperRange(null, timeTo);
return new SegmentIterator<>(
searchSpace.iterator(),
- baseKeySchema.hasNextCondition(null, null, timeFrom, timeTo,
true),
+ baseKeySchema.hasNextCondition(null, null, actualFrom, timeTo,
true),
binaryFrom,
binaryTo,
true);
@@ -250,13 +273,20 @@ public abstract class
AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
@Override
public KeyValueIterator<Bytes, byte[]> backwardFetchAll(final long
timeFrom,
final long timeTo)
{
- final List<KeyValueSegment> searchSpace = segments.segments(timeFrom,
timeTo, false);
- final Bytes binaryFrom = baseKeySchema.lowerRange(null, timeFrom);
+
+ final long actualFrom = getActualFrom(timeFrom, baseKeySchema
instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);
+
+ if (baseKeySchema instanceof
PrefixedWindowKeySchemas.TimeFirstWindowKeySchema && timeTo < actualFrom) {
+ return KeyValueIterators.emptyIterator();
+ }
+
+ final List<KeyValueSegment> searchSpace =
segments.segments(actualFrom, timeTo, false);
+ final Bytes binaryFrom = baseKeySchema.lowerRange(null, actualFrom);
final Bytes binaryTo = baseKeySchema.upperRange(null, timeTo);
return new SegmentIterator<>(
searchSpace.iterator(),
- baseKeySchema.hasNextCondition(null, null, timeFrom, timeTo,
false),
+ baseKeySchema.hasNextCondition(null, null, actualFrom, timeTo,
false),
binaryFrom,
binaryTo,
false);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
index 6c72fa64c5f..e7b7198d1cf 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
@@ -23,6 +23,6 @@ public class RocksDBSegmentedBytesStore extends
AbstractRocksDBSegmentedBytesSto
final long retention,
final long segmentInterval,
final KeySchema keySchema) {
- super(name, metricsScope, keySchema, new KeyValueSegments(name,
metricsScope, retention, segmentInterval));
+ super(name, metricsScope, retention, keySchema, new
KeyValueSegments(name, metricsScope, retention, segmentInterval));
}
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java
index 7fd958c2c27..39f493c761b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java
@@ -23,6 +23,6 @@ public class RocksDBTimestampedSegmentedBytesStore extends
AbstractRocksDBSegmen
final long retention,
final long segmentInterval,
final KeySchema keySchema) {
- super(name, metricsScope, keySchema, new TimestampedSegments(name,
metricsScope, retention, segmentInterval));
+ super(name, metricsScope, retention, keySchema, new
TimestampedSegments(name, metricsScope, retention, segmentInterval));
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
index 9abc2c9500d..91aa583060c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
@@ -64,6 +64,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
+import java.util.Collections;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
@@ -195,22 +196,25 @@ public class TimeWindowedKStreamIntegrationTest {
startStreams();
+ // on window close
+ // observedStreamTime : 10, retentionPeriod: 10, actualFrom: 0,
timeTo: 0, timeFrom: 0
+ // observedStreamTime : 15, retentionPeriod: 10, actualFrom: 5,
timeTo: 5, timeFrom: 1
+ // observedStreamTime : 25, retentionPeriod: 10, actualFrom: 15,
timeTo: 15, timeFrom: 6
+
final List<KeyValueTimestamp<Windowed<String>, String>>
windowedMessages = receiveMessagesWithTimestamp(
- new TimeWindowedDeserializer<>(new StringDeserializer(), 10L),
- new StringDeserializer(),
- 10L,
- String.class,
- emitFinal ? 6 : 12);
+ new TimeWindowedDeserializer<>(new StringDeserializer(), 10L),
+ new StringDeserializer(),
+ 10L,
+ String.class,
+ emitFinal ? 4 : 12);
final List<KeyValueTimestamp<Windowed<String>, String>> expectResult;
if (emitFinal) {
expectResult = asList(
- new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0L,
10L)), "0+1+2", 5),
- new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5L,
15L)), "0+2+3", 10),
- new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L,
15L)), "0+4+5", 11),
- new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(10L, 20L)), "0+3", 10),
- new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(10L, 20L)), "0+5+6", 15),
- new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(15L, 25L)), "0+6", 15)
+ new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(0L, 10L)), "0+1+2", 5),
+ new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(5L, 15L)), "0+2+3", 10),
+ new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(5L, 15L)), "0+4+5", 11),
+ new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(15L, 25L)), "0+6", 15)
);
} else {
expectResult = asList(
@@ -260,20 +264,22 @@ public class TimeWindowedKStreamIntegrationTest {
startStreams();
+ // on window close
+ // observedStreamTime : 15, retentionPeriod: 15, actualFrom: 0,
timeTo: 0, timeFrom: 0
+ // observedStreamTime : 25, retentionPeriod: 15, actualFrom: 10,
timeTo: 10, timeFrom: 1
+
final List<KeyValueTimestamp<Windowed<String>, String>>
windowedMessages = receiveMessagesWithTimestamp(
new TimeWindowedDeserializer<>(new StringDeserializer(), 10L),
new StringDeserializer(),
10L,
String.class,
- emitFinal ? 6 : 13);
+ emitFinal ? 4 : 13);
final List<KeyValueTimestamp<Windowed<String>, String>> expectResult;
if (emitFinal) {
expectResult = asList(
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0L,
10L)), "0+1+2", 5),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0L,
10L)), "0+4", 6),
- new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5L,
15L)), "0+2+3", 10),
- new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L,
15L)), "0+4+5", 11),
new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(10L, 20L)), "0+3", 10),
new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(10L, 20L)), "0+5+6", 15)
);
@@ -342,12 +348,13 @@ public class TimeWindowedKStreamIntegrationTest {
startStreams();
+ // ON_WINDOW_CLOSE expires all records.
List<KeyValueTimestamp<Windowed<String>, String>> windowedMessages =
receiveMessagesWithTimestamp(
new TimeWindowedDeserializer<>(new StringDeserializer(), 10L),
new StringDeserializer(),
10L,
String.class,
- emitFinal ? 5 : 9);
+ emitFinal ? 4 : 9);
List<KeyValueTimestamp<Windowed<String>, String>> expectResult;
if (emitFinal) {
@@ -358,8 +365,6 @@ public class TimeWindowedKStreamIntegrationTest {
5),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L,
15L)), "0+L2,R2",
11),
- new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(10L, 20L)),
- "0+L2,R2+L2,R2", 15),
new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(15L, 25L)),
"0+L2,R2", 15)
);
@@ -403,22 +408,27 @@ public class TimeWindowedKStreamIntegrationTest {
// Restart
startStreams();
- windowedMessages = receiveMessagesWithTimestamp(
- new TimeWindowedDeserializer<>(new StringDeserializer(), 10L),
- new StringDeserializer(),
- 10L,
- String.class,
- 2);
-
if (emitFinal) {
- // Output just new closed window for C
- expectResult = asList(
- new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(20L, 30L)),
- "0+L3,R3", 25),
+ windowedMessages = receiveMessagesWithTimestamp(
+ new TimeWindowedDeserializer<>(new StringDeserializer(),
10L),
+ new StringDeserializer(),
+ 10L,
+ String.class,
+ 1);
+
+ // Output just new/unexpired closed window for C
+ expectResult = Collections.singletonList(
new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(25L, 35L)),
"0+L3,R3", 25)
);
} else {
+ windowedMessages = receiveMessagesWithTimestamp(
+ new TimeWindowedDeserializer<>(new StringDeserializer(),
10L),
+ new StringDeserializer(),
+ 10L,
+ String.class,
+ 2);
+
expectResult = asList(
new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(30L, 40L)),
"0+L3,R3", 35),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
index 065c63270d3..50b0f8dcdde 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
@@ -1650,22 +1650,7 @@ public class KStreamSlidingWindowAggregateTest {
final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
if (emitFinal) {
- expected.put(0L, ValueAndTimestamp.make("ARSTU", 10L));
- expected.put(3L, ValueAndTimestamp.make("ASTU", 10L));
- expected.put(4L, ValueAndTimestamp.make("ATU", 10L));
- expected.put(5L, ValueAndTimestamp.make("ABTU", 15L));
- expected.put(6L, ValueAndTimestamp.make("ABCU", 16L));
- expected.put(8L, ValueAndTimestamp.make("ABCDU", 18L));
- expected.put(9L, ValueAndTimestamp.make("ABCD", 18L));
- expected.put(11L, ValueAndTimestamp.make("BCD", 18L));
- expected.put(16L, ValueAndTimestamp.make("CD", 18L));
- expected.put(17L, ValueAndTimestamp.make("D", 18L));
- expected.put(20L, ValueAndTimestamp.make("E", 30L));
- expected.put(30L, ValueAndTimestamp.make("EF", 40L));
- expected.put(31L, ValueAndTimestamp.make("F", 40L));
- expected.put(45L, ValueAndTimestamp.make("G", 55L));
- expected.put(46L, ValueAndTimestamp.make("GH", 56L));
- expected.put(48L, ValueAndTimestamp.make("GHIJ", 58L));
+ // only non-expired records
expected.put(52L, ValueAndTimestamp.make("GHIJK", 62L));
expected.put(53L, ValueAndTimestamp.make("GHIJKLMN", 63L));
expected.put(56L, ValueAndTimestamp.make("HIJKLMN", 63L));
@@ -1675,6 +1660,7 @@ public class KStreamSlidingWindowAggregateTest {
expected.put(66L, ValueAndTimestamp.make("O", 76L));
expected.put(67L, ValueAndTimestamp.make("OP", 77L));
expected.put(70L, ValueAndTimestamp.make("OPQ", 80L));
+
} else {
expected.put(0L, ValueAndTimestamp.make("ARSTU", 10L));
expected.put(3L, ValueAndTimestamp.make("ASTU", 10L));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 465b66d188f..23b250e503c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -286,11 +286,12 @@ public class KStreamWindowAggregateTest {
inputTopic1.pipeInput("A", "1", 20L);
processors.get(0).checkAndClearProcessResult(
- new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5,
15)), "0+1+1", 10),
- new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5,
15)), "0+2+2", 13),
- new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(5,
15)), "0+3", 14),
- new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(5,
15)), "0+4", 12)
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5,
15)), "0+1+1", 10),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5,
15)), "0+2+2", 13),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(5,
15)), "0+3", 14),
+ new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(5,
15)), "0+4", 12)
);
+
processors.get(1).checkAndClearProcessResult();
processors.get(2).checkAndClearProcessResult();
@@ -301,18 +302,24 @@ public class KStreamWindowAggregateTest {
inputTopic2.pipeInput("A", "a", 15L);
processors.get(0).checkAndClearProcessResult();
- processors.get(1).checkAndClearProcessResult(
- new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0,
10)), "0+a", 0),
- new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0,
10)), "0+b", 1),
- new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(0,
10)), "0+c", 2)
- );
- processors.get(2).checkAndClearProcessResult(
- new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0, 10)),
- "0+1+1%0+a", 9),
- new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(0, 10)),
- "0+2%0+b", 1),
- new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(0,
10)), "0+3%0+c",
- 2));
+
+ if (withCache) {
+ processors.get(1).checkAndClearProcessResult(
+ new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(0, 10)), "0+a", 0),
+ new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(0, 10)), "0+b", 1),
+ new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(0, 10)), "0+c", 2)
+ );
+ processors.get(2).checkAndClearProcessResult(
+ new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(0, 10)),
+ "0+1+1%0+a", 9),
+ new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(0, 10)),
+ "0+2%0+b", 1),
+ new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(0, 10)), "0+3%0+c",
+ 2));
+ } else {
+ processors.get(0).checkAndClearProcessResult();
+ processors.get(2).checkAndClearProcessResult();
+ }
inputTopic2.pipeInput("A", "a", 5L);
inputTopic2.pipeInput("B", "b", 6L);
@@ -321,11 +328,23 @@ public class KStreamWindowAggregateTest {
inputTopic2.pipeInput("A", "a", 21L);
processors.get(0).checkAndClearProcessResult();
- processors.get(1).checkAndClearProcessResult(
- new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5,
15)), "0+a", 5),
- new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5,
15)), "0+b", 6),
- new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(5,
15)), "0+d+d", 10)
- );
+ if (withCache) {
+ processors.get(1).checkAndClearProcessResult(
+ new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(5, 15)), "0+a", 5),
+ new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(5, 15)), "0+b", 6),
+ new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(5, 15)), "0+d+d", 10)
+ );
+ } else {
+ processors.get(1).checkAndClearProcessResult(
+ new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(0, 10)), "0+a", 0),
+ new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(0, 10)), "0+b", 1),
+ new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(0, 10)), "0+c", 2),
+ new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(5, 15)), "0+a", 5),
+ new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(5, 15)), "0+b", 6),
+ new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(5, 15)), "0+d+d", 10)
+ );
+
+ }
processors.get(2).checkAndClearProcessResult(
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5,
15)), "0+1+1%0+a",
10),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
index 41876581b38..14edfa861b9 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
@@ -233,12 +233,21 @@ public class SessionWindowedKStreamImplTest {
processData(driver);
final SessionStore<String, Long> store =
driver.getSessionStore("count-store");
final List<KeyValue<Windowed<String>, Long>> data =
StreamsTestUtils.toList(store.fetch("1", "2"));
- assertThat(
- data,
- equalTo(Arrays.asList(
- KeyValue.pair(new Windowed<>("1", new SessionWindow(10,
15)), 2L),
- KeyValue.pair(new Windowed<>("1", new SessionWindow(600,
600)), 1L),
- KeyValue.pair(new Windowed<>("2", new SessionWindow(599,
600)), 2L))));
+ if (!emitFinal) {
+ assertThat(
+ data,
+ equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new
SessionWindow(10, 15)), 2L),
+ KeyValue.pair(new Windowed<>("1", new
SessionWindow(600, 600)), 1L),
+ KeyValue.pair(new Windowed<>("2", new
SessionWindow(599, 600)), 2L))));
+ } else {
+ assertThat(
+ data,
+ equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new
SessionWindow(600, 600)), 1L),
+ KeyValue.pair(new Windowed<>("2", new
SessionWindow(599, 600)), 2L))));
+
+ }
}
}
@@ -251,12 +260,21 @@ public class SessionWindowedKStreamImplTest {
final SessionStore<String, String> sessionStore =
driver.getSessionStore("reduced");
final List<KeyValue<Windowed<String>, String>> data =
StreamsTestUtils.toList(sessionStore.fetch("1", "2"));
- assertThat(
- data,
- equalTo(Arrays.asList(
- KeyValue.pair(new Windowed<>("1", new SessionWindow(10,
15)), "1+2"),
- KeyValue.pair(new Windowed<>("1", new SessionWindow(600,
600)), "3"),
- KeyValue.pair(new Windowed<>("2", new SessionWindow(599,
600)), "1+2"))));
+ if (!emitFinal) {
+ assertThat(
+ data,
+ equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new
SessionWindow(10, 15)), "1+2"),
+ KeyValue.pair(new Windowed<>("1", new
SessionWindow(600, 600)), "3"),
+ KeyValue.pair(new Windowed<>("2", new
SessionWindow(599, 600)), "1+2"))));
+ } else {
+ assertThat(
+ data,
+ equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new
SessionWindow(600, 600)), "3"),
+ KeyValue.pair(new Windowed<>("2", new
SessionWindow(599, 600)), "1+2"))));
+
+ }
}
}
@@ -272,12 +290,21 @@ public class SessionWindowedKStreamImplTest {
processData(driver);
final SessionStore<String, String> sessionStore =
driver.getSessionStore("aggregated");
final List<KeyValue<Windowed<String>, String>> data =
StreamsTestUtils.toList(sessionStore.fetch("1", "2"));
- assertThat(
- data,
- equalTo(Arrays.asList(
- KeyValue.pair(new Windowed<>("1", new SessionWindow(10,
15)), "0+0+1+2"),
- KeyValue.pair(new Windowed<>("1", new SessionWindow(600,
600)), "0+3"),
- KeyValue.pair(new Windowed<>("2", new SessionWindow(599,
600)), "0+0+1+2"))));
+ if (!emitFinal) {
+ assertThat(
+ data,
+ equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new
SessionWindow(10, 15)), "0+0+1+2"),
+ KeyValue.pair(new Windowed<>("1", new
SessionWindow(600, 600)), "0+3"),
+ KeyValue.pair(new Windowed<>("2", new
SessionWindow(599, 600)), "0+0+1+2"))));
+ } else {
+ assertThat(
+ data,
+ equalTo(Arrays.asList(
+ KeyValue.pair(new Windowed<>("1", new
SessionWindow(600, 600)), "0+3"),
+ KeyValue.pair(new Windowed<>("2", new
SessionWindow(599, 600)), "0+0+1+2"))));
+
+ }
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
index 5ac43ac8082..6aa4d17dca2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
@@ -51,6 +51,7 @@ import org.junit.Test;
import java.util.List;
import java.util.Properties;
+import java.util.Collections;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
@@ -198,6 +199,7 @@ public class TimeWindowedKStreamImplTest {
}
final ArrayList<KeyValueTimestamp<Windowed<String>, String>> processed
= supplier.theCapturedProcessor().processed();
+
if (emitFinal) {
assertEquals(
asList(
@@ -238,11 +240,26 @@ public class TimeWindowedKStreamImplTest {
final List<KeyValue<Windowed<String>, Long>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2",
ofEpochMilli(0), ofEpochMilli(1000L)));
- assertThat(data, equalTo(asList(
- KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)),
2L),
- KeyValue.pair(new Windowed<>("1", new TimeWindow(500,
1000)), 1L),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(500,
1000)), 2L),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(1000,
1500)), 1L))));
+ if (withCache) {
+ // with cache returns all records (expired from underneath
as well) as part of
+ // the merge process
+ assertThat(data, equalTo(asList(
+ KeyValue.pair(new Windowed<>("1", new
TimeWindow(0, 500)), 2L),
+ KeyValue.pair(new Windowed<>("1", new
TimeWindow(500, 1000)), 1L),
+ KeyValue.pair(new Windowed<>("2", new
TimeWindow(500, 1000)), 2L),
+ KeyValue.pair(new Windowed<>("2", new
TimeWindow(1000, 1500)), 1L))));
+ } else {
+ // without cache, we get only non-expired record from
underlying store.
+ if (!emitFinal) {
+ assertThat(data, equalTo(Collections.singletonList(
+ KeyValue.pair(new Windowed<>("2", new
TimeWindow(1000, 1500)), 1L))));
+ } else {
+ assertThat(data, equalTo(asList(
+ KeyValue.pair(new Windowed<>("1", new
TimeWindow(500, 1000)), 1L),
+ KeyValue.pair(new Windowed<>("2", new
TimeWindow(500, 1000)), 2L),
+ KeyValue.pair(new Windowed<>("2", new
TimeWindow(1000, 1500)), 1L))));
+ }
+ }
}
{
final WindowStore<String, ValueAndTimestamp<Long>> windowStore
=
@@ -250,11 +267,24 @@ public class TimeWindowedKStreamImplTest {
final List<KeyValue<Windowed<String>,
ValueAndTimestamp<Long>>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2",
ofEpochMilli(0), ofEpochMilli(1000L)));
- assertThat(data, equalTo(asList(
- KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)),
ValueAndTimestamp.make(2L, 15L)),
- KeyValue.pair(new Windowed<>("1", new TimeWindow(500,
1000)), ValueAndTimestamp.make(1L, 500L)),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(500,
1000)), ValueAndTimestamp.make(2L, 550L)),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(1000,
1500)), ValueAndTimestamp.make(1L, 1000L)))));
+ // the same values and logic described above applies here as
well.
+ if (withCache) {
+ assertThat(data, equalTo(asList(
+ KeyValue.pair(new Windowed<>("1", new
TimeWindow(0, 500)), ValueAndTimestamp.make(2L, 15L)),
+ KeyValue.pair(new Windowed<>("1", new
TimeWindow(500, 1000)), ValueAndTimestamp.make(1L, 500L)),
+ KeyValue.pair(new Windowed<>("2", new
TimeWindow(500, 1000)), ValueAndTimestamp.make(2L, 550L)),
+ KeyValue.pair(new Windowed<>("2", new
TimeWindow(1000, 1500)), ValueAndTimestamp.make(1L, 1000L)))));
+ } else {
+ if (!emitFinal) {
+ assertThat(data, equalTo(Collections.singletonList(
+ KeyValue.pair(new Windowed<>("2", new
TimeWindow(1000, 1500)), ValueAndTimestamp.make(1L, 1000L)))));
+ } else {
+ assertThat(data, equalTo(asList(
+ KeyValue.pair(new Windowed<>("1", new
TimeWindow(500, 1000)), ValueAndTimestamp.make(1L, 500L)),
+ KeyValue.pair(new Windowed<>("2", new
TimeWindow(500, 1000)), ValueAndTimestamp.make(2L, 550L)),
+ KeyValue.pair(new Windowed<>("2", new
TimeWindow(1000, 1500)), ValueAndTimestamp.make(1L, 1000L)))));
+ }
+ }
}
}
}
@@ -274,22 +304,37 @@ public class TimeWindowedKStreamImplTest {
final List<KeyValue<Windowed<String>, String>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2",
ofEpochMilli(0), ofEpochMilli(1000L)));
- assertThat(data, equalTo(asList(
- KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)),
"1+2"),
- KeyValue.pair(new Windowed<>("1", new TimeWindow(500,
1000)), "3"),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(500,
1000)), "10+20"),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(1000,
1500)), "30"))));
+ if (withCache) {
+ // with cache returns all records (expired from underneath
as well) as part of
+ // the merge process
+ assertThat(data, equalTo(asList(
+ KeyValue.pair(new Windowed<>("1", new
TimeWindow(0, 500)), "1+2"),
+ KeyValue.pair(new Windowed<>("1", new
TimeWindow(500, 1000)), "3"),
+ KeyValue.pair(new Windowed<>("2", new
TimeWindow(500, 1000)), "10+20"),
+ KeyValue.pair(new Windowed<>("2", new
TimeWindow(1000, 1500)), "30"))));
+ } else {
+ // without cache, we get only non-expired record from
underlying store.
+ // actualFrom = observedStreamTime(1500) -
retentionPeriod(1000) + 1 = 501.
+ // only 1 record is non expired and would be returned.
+ assertThat(data,
equalTo(Collections.singletonList(KeyValue.pair(new Windowed<>("2", new
TimeWindow(1000, 1500)), "30"))));
+ }
}
{
final WindowStore<String, ValueAndTimestamp<String>>
windowStore = driver.getTimestampedWindowStore("reduced");
final List<KeyValue<Windowed<String>,
ValueAndTimestamp<String>>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2",
ofEpochMilli(0), ofEpochMilli(1000L)));
- assertThat(data, equalTo(asList(
- KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)),
ValueAndTimestamp.make("1+2", 15L)),
- KeyValue.pair(new Windowed<>("1", new TimeWindow(500,
1000)), ValueAndTimestamp.make("3", 500L)),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(500,
1000)), ValueAndTimestamp.make("10+20", 550L)),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(1000,
1500)), ValueAndTimestamp.make("30", 1000L)))));
+ // same logic/data as explained above.
+ if (withCache) {
+ assertThat(data, equalTo(asList(
+ KeyValue.pair(new Windowed<>("1", new
TimeWindow(0, 500)), ValueAndTimestamp.make("1+2", 15L)),
+ KeyValue.pair(new Windowed<>("1", new
TimeWindow(500, 1000)), ValueAndTimestamp.make("3", 500L)),
+ KeyValue.pair(new Windowed<>("2", new
TimeWindow(500, 1000)), ValueAndTimestamp.make("10+20", 550L)),
+ KeyValue.pair(new Windowed<>("2", new
TimeWindow(1000, 1500)), ValueAndTimestamp.make("30", 1000L)))));
+ } else {
+ assertThat(data, equalTo(Collections.singletonList(
+ KeyValue.pair(new Windowed<>("2", new
TimeWindow(1000, 1500)), ValueAndTimestamp.make("30", 1000L)))));
+ }
}
}
}
@@ -310,22 +355,36 @@ public class TimeWindowedKStreamImplTest {
final List<KeyValue<Windowed<String>, String>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2",
ofEpochMilli(0), ofEpochMilli(1000L)));
- assertThat(data, equalTo(asList(
- KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)),
"0+1+2"),
- KeyValue.pair(new Windowed<>("1", new TimeWindow(500,
1000)), "0+3"),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(500,
1000)), "0+10+20"),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(1000,
1500)), "0+30"))));
+ if (withCache) {
+ // with cache returns all records (expired from underneath
as well) as part of
+ // the merge process
+ assertThat(data, equalTo(asList(
+ KeyValue.pair(new Windowed<>("1", new
TimeWindow(0, 500)), "0+1+2"),
+ KeyValue.pair(new Windowed<>("1", new
TimeWindow(500, 1000)), "0+3"),
+ KeyValue.pair(new Windowed<>("2", new
TimeWindow(500, 1000)), "0+10+20"),
+ KeyValue.pair(new Windowed<>("2", new
TimeWindow(1000, 1500)), "0+30"))));
+ } else {
+ // without cache, we get only non-expired record from
underlying store.
+ // actualFrom = observedStreamTime(1500) -
retentionPeriod(1000) + 1 = 501.
+ // only 1 record is non expired and would be returned.
+ assertThat(data, equalTo(Collections
+ .singletonList(KeyValue.pair(new Windowed<>("2",
new TimeWindow(1000, 1500)), "0+30"))));
+ }
}
{
final WindowStore<String, ValueAndTimestamp<String>>
windowStore = driver.getTimestampedWindowStore("aggregated");
final List<KeyValue<Windowed<String>,
ValueAndTimestamp<String>>> data =
StreamsTestUtils.toList(windowStore.fetch("1", "2",
ofEpochMilli(0), ofEpochMilli(1000L)));
-
- assertThat(data, equalTo(asList(
- KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)),
ValueAndTimestamp.make("0+1+2", 15L)),
- KeyValue.pair(new Windowed<>("1", new TimeWindow(500,
1000)), ValueAndTimestamp.make("0+3", 500L)),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(500,
1000)), ValueAndTimestamp.make("0+10+20", 550L)),
- KeyValue.pair(new Windowed<>("2", new TimeWindow(1000,
1500)), ValueAndTimestamp.make("0+30", 1000L)))));
+ if (withCache) {
+ assertThat(data, equalTo(asList(
+ KeyValue.pair(new Windowed<>("1", new
TimeWindow(0, 500)), ValueAndTimestamp.make("0+1+2", 15L)),
+ KeyValue.pair(new Windowed<>("1", new
TimeWindow(500, 1000)), ValueAndTimestamp.make("0+3", 500L)),
+ KeyValue.pair(new Windowed<>("2", new
TimeWindow(500, 1000)), ValueAndTimestamp.make("0+10+20", 550L)),
+ KeyValue.pair(new Windowed<>("2", new
TimeWindow(1000, 1500)), ValueAndTimestamp.make("0+30", 1000L)))));
+ } else {
+ assertThat(data, equalTo(Collections.singletonList(
+ KeyValue.pair(new Windowed<>("2", new
TimeWindow(1000, 1500)), ValueAndTimestamp.make("0+30", 1000L)))));
+ }
}
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
index 586ff03d62f..c81e57589a0 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
@@ -179,11 +179,10 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) {
-
- final List<KeyValue<Windowed<String>, Long>> expected = asList(
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L)
- );
+ // For all tests, actualFrom is computed using observedStreamTime
- retention + 1.
+ // so actualFrom = 60000(observedStreamTime) - 1000(retention) + 1
= 59001
+ // all records expired as actual from is 59001 and to is 1000
+ final List<KeyValue<Windowed<String>, Long>> expected =
Collections.emptyList();
assertEquals(expected, toList(values));
}
@@ -191,11 +190,8 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0,
windows[2].start())) {
- final List<KeyValue<Windowed<String>, Long>> expected = asList(
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L)
- );
+ // all records expired as actual from is 59001 and to is 1000
+ final List<KeyValue<Windowed<String>, Long>> expected =
Collections.emptyList();
assertEquals(expected, toList(values));
}
@@ -203,20 +199,16 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
null, Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
- final List<KeyValue<Windowed<String>, Long>> expected = asList(
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L)
- );
-
+ // all records expired as actual from is 59001 and to is 1000
+ final List<KeyValue<Windowed<String>, Long>> expected =
Collections.emptyList();
assertEquals(expected, toList(values));
}
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
Bytes.wrap(keyB.getBytes()), null, 0, windows[3].start())) {
- final List<KeyValue<Windowed<String>, Long>> expected = asList(
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
+ // key B is expired as actual from is 59001
+ final List<KeyValue<Windowed<String>, Long>> expected =
Collections.singletonList(
KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
);
@@ -226,10 +218,8 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
null, null, 0, windows[3].start())) {
- final List<KeyValue<Windowed<String>, Long>> expected = asList(
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
+ // keys A and B expired as actual from is 59001
+ final List<KeyValue<Windowed<String>, Long>> expected =
Collections.singletonList(
KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
);
@@ -251,10 +241,10 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
try (final KeyValueIterator<Bytes, byte[]> values =
bytesStore.backwardFetch(
Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) {
- final List<KeyValue<Windowed<String>, Long>> expected = asList(
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
- );
+ // For all tests, actualFrom is computed using observedStreamTime
- retention + 1.
+ // so actualFrom = 60000(observedStreamTime) - 1000(retention) + 1
= 59001
+ // all records expired as actual from is 59001 and to = 1000
+ final List<KeyValue<Windowed<String>, Long>> expected =
Collections.emptyList();
assertEquals(expected, toList(values));
}
@@ -262,11 +252,8 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
try (final KeyValueIterator<Bytes, byte[]> values =
bytesStore.backwardFetch(
Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0,
windows[2].start())) {
- final List<KeyValue<Windowed<String>, Long>> expected = asList(
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
- );
+ // all records expired as actual from is 59001 and to = 1000
+ final List<KeyValue<Windowed<String>, Long>> expected =
Collections.emptyList();
assertEquals(expected, toList(values));
}
@@ -274,21 +261,17 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
try (final KeyValueIterator<Bytes, byte[]> values =
bytesStore.backwardFetch(
null, Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
- final List<KeyValue<Windowed<String>, Long>> expected = asList(
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
- );
-
+ // all records expired as actual from is 59001 and to = 1000
+ final List<KeyValue<Windowed<String>, Long>> expected =
Collections.emptyList();
assertEquals(expected, toList(values));
}
try (final KeyValueIterator<Bytes, byte[]> values =
bytesStore.backwardFetch(
Bytes.wrap(keyB.getBytes()), null, 0, windows[3].start())) {
- final List<KeyValue<Windowed<String>, Long>> expected = asList(
- KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L),
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L)
+ // only 1 record left as actual from is 59001 and to = 60,000
+ final List<KeyValue<Windowed<String>, Long>> expected =
Collections.singletonList(
+ KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
);
assertEquals(expected, toList(values));
@@ -297,11 +280,9 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
try (final KeyValueIterator<Bytes, byte[]> values =
bytesStore.backwardFetch(
null, null, 0, windows[3].start())) {
- final List<KeyValue<Windowed<String>, Long>> expected = asList(
- KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L),
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
+ // only 1 record left as actual from is 59001 and to = 60,000
+ final List<KeyValue<Windowed<String>, Long>> expected =
Collections.singletonList(
+ KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
);
assertEquals(expected, toList(values));
@@ -854,18 +835,24 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
bytesStore.put(serializeKey(new Windowed<>(keyB, windows[2])),
expectedValue3);
bytesStore.put(serializeKey(new Windowed<>(keyC, windows[3])),
expectedValue4);
+ // Record expired as timestampFromRawKey = 1000 while
observedStreamTime = 60,000 and retention = 1000.
final byte[] value1 = ((RocksDBTimeOrderedSessionSegmentedBytesStore)
bytesStore).fetchSession(
key1, windows[0].start(), windows[0].end());
- assertEquals(Bytes.wrap(value1), Bytes.wrap(expectedValue1));
+ assertNull(value1);
+ // Record expired as timestampFromRawKey = 1000 while
observedStreamTime = 60,000 and retention = 1000.
final byte[] value2 = ((RocksDBTimeOrderedSessionSegmentedBytesStore)
bytesStore).fetchSession(
key1, windows[1].start(), windows[1].end());
- assertEquals(Bytes.wrap(value2), Bytes.wrap(expectedValue2));
+ assertNull(value2);
+ // expired record
+ // timestampFromRawKey = 1500 while observedStreamTime = 60,000 and
retention = 1000.
final byte[] value3 = ((RocksDBTimeOrderedSessionSegmentedBytesStore)
bytesStore).fetchSession(
key2, windows[2].start(), windows[2].end());
- assertEquals(Bytes.wrap(value3), Bytes.wrap(expectedValue3));
+ assertNull(value3);
+ // only non-expired record
+ // timestampFromRawKey = 60,000 while observedStreamTime = 60,000 and
retention = 1000.
final byte[] value4 = ((RocksDBTimeOrderedSessionSegmentedBytesStore)
bytesStore).fetchSession(
key3, windows[3].start(), windows[3].end());
assertEquals(Bytes.wrap(value4), Bytes.wrap(expectedValue4));
@@ -991,10 +978,26 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
try (final KeyValueIterator<Bytes, byte[]> results =
bytesStore.fetch(
Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 1,
2000)) {
- final List<KeyValue<Windowed<String>, Long>> expected = asList(
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 20L)
- );
+ final List<KeyValue<Windowed<String>, Long>> expected;
+
+ // actual from: observedStreamTime - retention + 1
+ if (getBaseSchema() instanceof TimeFirstWindowKeySchema) {
+ // For windowkeyschema, actual from is 1
+ // observed stream time = 1000. Retention Period = 1000.
+ // actual from = (1000 - 1000 + 1)
+ // and search happens in the range 1-2000
+ expected = asList(
+ KeyValue.pair(new Windowed<>(keyA, windows[0]),
10L),
+ KeyValue.pair(new Windowed<>(keyB, windows[2]),
20L)
+ );
+ } else {
+ // For session key schema, actual from is 501
+ // observed stream time = 1500. Retention Period = 1000.
+ // actual from = (1500 - 1000 + 1)
+ // and search happens in the range 501-2000
+ expected = Collections.singletonList(KeyValue.pair(new
Windowed<>(keyB, windows[2]), 20L));
+ }
+
assertEquals(expected, toList(results));
}
@@ -1010,11 +1013,27 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
bytesStore.put(serializeKey(new Windowed<>(key, windows[0])),
serializeValue(10));
bytesStore.put(serializeKey(new Windowed<>(key, windows[1])),
serializeValue(50));
bytesStore.put(serializeKey(new Windowed<>(key, windows[2])),
serializeValue(100));
+ // actual from: observedStreamTime - retention + 1
+ // retention = 1000
try (final KeyValueIterator<Bytes, byte[]> results =
bytesStore.fetch(Bytes.wrap(key.getBytes()), 1, 999)) {
- final List<KeyValue<Windowed<String>, Long>> expected = asList(
- KeyValue.pair(new Windowed<>(key, windows[0]), 10L),
- KeyValue.pair(new Windowed<>(key, windows[1]), 50L)
- );
+
+ final List<KeyValue<Windowed<String>, Long>> expected;
+
+ // actual from: observedStreamTime - retention + 1
+ if (getBaseSchema() instanceof TimeFirstWindowKeySchema) {
+ // For windowkeyschema, actual from is 1
+ // observed stream time = 1000. actual from = (1000 - 1000 + 1)
+ // and search happens in the range 1-2000
+ expected = asList(
+ KeyValue.pair(new Windowed<>(key, windows[0]), 10L),
+ KeyValue.pair(new Windowed<>(key, windows[1]), 50L)
+ );
+ } else {
+ // For session key schema, actual from is 501
+ // observed stream time = 1500. actual from = (1500 - 1000 + 1)
+ // and search happens in the range 501-2000 deeming first
record as expired.
+ expected = Collections.singletonList(KeyValue.pair(new
Windowed<>(key, windows[1]), 50L));
+ }
assertEquals(expected, toList(results));
}
@@ -1054,15 +1073,24 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
final List<KeyValue<Windowed<String>, Long>> results =
toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 1500));
+ // For all tests, actualFrom is computed using observedStreamTime -
retention + 1.
+ // so actualFrom = 60000(observedStreamTime) - 1000(retention) + 1 =
59001
+ // don't return expired records.
assertEquals(
- asList(
- KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
- KeyValue.pair(new Windowed<>(key, windows[1]), 100L),
- KeyValue.pair(new Windowed<>(key, windows[2]), 500L)
- ),
+ Collections.emptyList(),
results
);
+ final List<KeyValue<Windowed<String>, Long>> results1 =
toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 59000, 60000));
+
+ // only non expired record as actual from is 59001
+ assertEquals(
+ Collections.singletonList(
+ KeyValue.pair(new Windowed<>(key, windows[3]), 1000L)
+ ),
+ results1
+ );
+
segments.close();
}
@@ -1086,9 +1114,11 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
);
final List<KeyValue<Windowed<String>, Long>> results =
toList(bytesStore.all());
+ // actualFrom is computed using observedStreamTime - retention + 1.
+ // so actualFrom = 60000(observedStreamTime) - 1000(retention) + 1 =
59001
+ // only one record returned as actual from is 59001
assertEquals(
- asList(
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 50L),
+ Collections.singletonList(
KeyValue.pair(new Windowed<>(keyB, windows[3]), 100L)
),
results
@@ -1115,12 +1145,13 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
),
segmentDirs()
);
-
+ // For all tests, actualFrom is computed using observedStreamTime -
retention + 1.
+ // so actualFrom = 60000(observedStreamTime) - 1000(retention) + 1 =
59001
+ // key A expired as actual from is 59,001
final List<KeyValue<Windowed<String>, Long>> results =
toList(bytesStore.backwardAll());
assertEquals(
- asList(
- KeyValue.pair(new Windowed<>(keyB, windows[3]), 100L),
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 50L)
+ Collections.singletonList(
+ KeyValue.pair(new Windowed<>(keyB, windows[3]), 100L)
),
results
);
@@ -1147,9 +1178,11 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
);
final List<KeyValue<Windowed<String>, Long>> results =
toList(bytesStore.fetchAll(0L, 60_000L));
+ // For all tests, actualFrom is computed using observedStreamTime -
retention + 1.
+ // so actualFrom = 60000(observedStreamTime) - 1000(retention) + 1 =
59001
+ // only 1 record fetched as actual from is 59001
assertEquals(
- asList(
- KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
+ Collections.singletonList(
KeyValue.pair(new Windowed<>(key, windows[3]), 100L)
),
results
@@ -1277,9 +1310,9 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
assertEquals(2, bytesStore.getSegments().size());
final List<KeyValue<Windowed<String>, Long>> expected = new
ArrayList<>();
- expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 100L));
+ // after restoration, only 1 record should be returned as actual from
is 59001 and the prior record is expired.
final List<KeyValue<Windowed<String>, Long>> results =
toList(bytesStore.all());
assertEquals(expected, results);
}
@@ -1332,10 +1365,9 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
final String key = "a";
final List<KeyValue<Windowed<String>, Long>> expected = new
ArrayList<>();
- expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
- expected.add(new KeyValue<>(new Windowed<>(key, windows[2]), 100L));
expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 200L));
+ // after restoration, only non expired segments should be returned
which is one as actual from is 59001
final List<KeyValue<Windowed<String>, Long>> results =
toList(bytesStore.all());
assertEquals(expected, results);
assertThat(bytesStore.getPosition(), Matchers.notNullValue());
@@ -1368,9 +1400,9 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
assertEquals(2, bytesStore.getSegments().size());
final String key = "a";
+
+ // only non expired record as actual from is 59001
final List<KeyValue<Windowed<String>, Long>> expected = new
ArrayList<>();
- expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
- expected.add(new KeyValue<>(new Windowed<>(key, windows[2]), 100L));
expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 200L));
final List<KeyValue<Windowed<String>, Long>> results =
toList(bytesStore.all());
@@ -1407,7 +1439,15 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
assertEquals(1, bytesStore.getSegments().size());
final String key = "a";
final List<KeyValue<Windowed<String>, Long>> expected = new
ArrayList<>();
- expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
+
+ // actual from = observedStreamTime - retention + 1.
+ // retention = 1000
+ if (getBaseSchema() instanceof TimeFirstWindowKeySchema) {
+ // For window stores, observedSteam = 1000 => actualFrom = 1
+ // For session stores, observedSteam = 1500 => actualFrom = 501
which deems
+ // the below record as expired.
+ expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
+ }
final List<KeyValue<Windowed<String>, Long>> results =
toList(bytesStore.all());
assertEquals(expected, results);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index 49ff61992f8..6e1b3bfcf8c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -171,44 +171,30 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) {
-
- final List<KeyValue<Windowed<String>, Long>> expected =
Arrays.asList(
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L)
- );
-
- assertEquals(expected, toList(values));
+ // All Records expired as observed stream time = 60000 implying
actual-from = 59001 (60000 - 1000 + 1)
+ // for WindowKeySchema, to = 60000 while for SessionKeySchema, to
= 30000
+ assertEquals(Collections.emptyList(), toList(values));
}
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0,
windows[2].start())) {
-
- final List<KeyValue<Windowed<String>, Long>> expected =
Arrays.asList(
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L)
- );
-
- assertEquals(expected, toList(values));
+ // All Records expired as observed stream time = 60000 implying
actual-from = 59001 (60000 - 1000 + 1)
+ // for WindowKeySchema, to = 60000 while for SessionKeySchema, to
= 30000
+ assertEquals(Collections.emptyList(), toList(values));
}
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
null, Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
-
- final List<KeyValue<Windowed<String>, Long>> expected =
Arrays.asList(
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L)
- );
-
- assertEquals(expected, toList(values));
+ // All Records expired as observed stream time = 60000 implying
actual-from = 59001 (60000 - 1000 + 1)
+ // for WindowKeySchema, to = 60000 while for SessionKeySchema, to
= 30000
+ assertEquals(Collections.emptyList(), toList(values));
}
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
Bytes.wrap(keyB.getBytes()), null, 0, windows[3].start())) {
-
- final List<KeyValue<Windowed<String>, Long>> expected =
Arrays.asList(
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
+ // Only 1 record not expired as observed stream time = 60000
implying actual-from = 59001 (60000 - 1000 + 1)
+ // for WindowKeySchema, to = 60000 while for SessionKeySchema, to
= 30000
+ final List<KeyValue<Windowed<String>, Long>> expected =
Collections.singletonList(
KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
);
@@ -217,11 +203,9 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
try (final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(
null, null, 0, windows[3].start())) {
-
- final List<KeyValue<Windowed<String>, Long>> expected =
Arrays.asList(
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L),
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
+ // Only 1 record not expired as observed stream time = 60000
implying actual-from = 59001 (60000 - 1000 + 1)
+ // for WindowKeySchema, to = 60000 while for SessionKeySchema, to
= 30000
+ final List<KeyValue<Windowed<String>, Long>> expected =
Collections.singletonList(
KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
);
@@ -242,44 +226,33 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
try (final KeyValueIterator<Bytes, byte[]> values =
bytesStore.backwardFetch(
Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) {
- final List<KeyValue<Windowed<String>, Long>> expected =
Arrays.asList(
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
- );
-
- assertEquals(expected, toList(values));
+ // All Records expired as observed stream time = 60000 implying
actual-from = 59001 (60000 - 1000 + 1)
+ // for WindowKeySchema, to = 60000 while for SessionKeySchema, to
= 30000
+ assertEquals(Collections.emptyList(), toList(values));
}
try (final KeyValueIterator<Bytes, byte[]> values =
bytesStore.backwardFetch(
Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0,
windows[2].start())) {
- final List<KeyValue<Windowed<String>, Long>> expected =
Arrays.asList(
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
- );
-
- assertEquals(expected, toList(values));
+ // All Records expired as observed stream time = 60000 implying
actual-from = 59001 (60000 - 1000 + 1)
+ // for WindowKeySchema, to = 60000 while for SessionKeySchema, to
= 30000
+ assertEquals(Collections.emptyList(), toList(values));
}
try (final KeyValueIterator<Bytes, byte[]> values =
bytesStore.backwardFetch(
null, Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) {
- final List<KeyValue<Windowed<String>, Long>> expected =
Arrays.asList(
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
- );
-
- assertEquals(expected, toList(values));
+ // All Records expired as observed stream time = 60000 implying
actual-from = 59001 (60000 - 1000 + 1)
+ // for WindowKeySchema, to = 60000 while for SessionKeySchema, to
= 30000
+ assertEquals(Collections.emptyList(), toList(values));
}
try (final KeyValueIterator<Bytes, byte[]> values =
bytesStore.backwardFetch(
Bytes.wrap(keyB.getBytes()), null, 0, windows[3].start())) {
-
- final List<KeyValue<Windowed<String>, Long>> expected =
Arrays.asList(
- KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L),
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L)
+ // Only 1 record not expired as observed stream time = 60000
implying actual-from = 59001 (60000 - 1000 + 1)
+ // for WindowKeySchema, to = 60000 while for SessionKeySchema, to
= 30000
+ final List<KeyValue<Windowed<String>, Long>> expected =
Collections.singletonList(
+ KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
);
assertEquals(expected, toList(values));
@@ -287,12 +260,10 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
try (final KeyValueIterator<Bytes, byte[]> values =
bytesStore.backwardFetch(
null, null, 0, windows[3].start())) {
-
- final List<KeyValue<Windowed<String>, Long>> expected =
Arrays.asList(
- KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L),
- KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L),
- KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L),
- KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L)
+ // Only 1 record not expired as observed stream time = 60000
implying actual-from = 59001 (60000 - 1000 + 1)
+ // for WindowKeySchema, to = 60000 while for SessionKeySchema, to
= 30000
+ final List<KeyValue<Windowed<String>, Long>> expected =
Collections.singletonList(
+ KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L)
);
assertEquals(expected, toList(values));
@@ -306,10 +277,18 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
bytesStore.put(serializeKey(new Windowed<>(key, windows[1])),
serializeValue(50));
bytesStore.put(serializeKey(new Windowed<>(key, windows[2])),
serializeValue(100));
try (final KeyValueIterator<Bytes, byte[]> results =
bytesStore.fetch(Bytes.wrap(key.getBytes()), 1, 999)) {
- final List<KeyValue<Windowed<String>, Long>> expected =
Arrays.asList(
- KeyValue.pair(new Windowed<>(key, windows[0]), 10L),
- KeyValue.pair(new Windowed<>(key, windows[1]), 50L)
- );
+ final List<KeyValue<Windowed<String>, Long>> expected = new
ArrayList<>();
+ /*
+ * For WindowKeySchema, the observedStreamTime is 1000 which means
1 extra record gets returned while for
+ * SessionKeySchema, it's 1500. Which changes the actual-from while
fetching. In case of SessionKeySchema, the
+ * fetch happens from 501-999 while for WindowKeySchema it's from
1-999.
+ */
+ if (schema instanceof SessionKeySchema) {
+ expected.add(KeyValue.pair(new Windowed<>(key, windows[1]),
50L));
+ } else {
+ expected.add(KeyValue.pair(new Windowed<>(key, windows[0]),
10L));
+ expected.add(KeyValue.pair(new Windowed<>(key, windows[1]),
50L));
+ }
assertEquals(expected, toList(results));
}
@@ -341,16 +320,13 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
assertEquals(Utils.mkSet(segments.segmentName(0),
segments.segmentName(1)), segmentDirs());
final List<KeyValue<Windowed<String>, Long>> results =
toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 1500));
-
+ /*
+ * All records expired as observed stream time = 60,000 which sets
actual-from to 59001(60,000 - 1000 + 1). to = 1500.
+ */
assertEquals(
- Arrays.asList(
- KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
- KeyValue.pair(new Windowed<>(key, windows[1]), 100L),
- KeyValue.pair(new Windowed<>(key, windows[2]), 500L)
- ),
+ Collections.emptyList(),
results
);
-
segments.close();
}
@@ -371,11 +347,12 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
),
segmentDirs()
);
-
+ /*
+ * Only 1 record returned. observed stream time = 60000, actual from =
59001 (60000 - 1000 + 1) and to = Long.MAX.
+ */
final List<KeyValue<Windowed<String>, Long>> results =
toList(bytesStore.all());
assertEquals(
- Arrays.asList(
- KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
+ Collections.singletonList(
KeyValue.pair(new Windowed<>(key, windows[3]), 100L)
),
results
@@ -401,11 +378,12 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
),
segmentDirs()
);
-
+ /*
+ * Only 1 record returned. observed stream time = 60000, actual from =
59001 (60000 - 1000 + 1) and to = 60,000.
+ */
final List<KeyValue<Windowed<String>, Long>> results =
toList(bytesStore.fetchAll(0L, 60_000L));
assertEquals(
- Arrays.asList(
- KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
+ Collections.singletonList(
KeyValue.pair(new Windowed<>(key, windows[3]), 100L)
),
results
@@ -529,8 +507,10 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
// 2 segments are created during restoration.
assertEquals(2, bytesStore.getSegments().size());
+ /*
+ * Only 1 record returned. observed stream time = 60000, actual from =
59001 (60000 - 1000 + 1) and to = Long.MAX.
+ */
final List<KeyValue<Windowed<String>, Long>> expected = new
ArrayList<>();
- expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 100L));
final List<KeyValue<Windowed<String>, Long>> results =
toList(bytesStore.all());
@@ -584,9 +564,10 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
assertEquals(2, bytesStore.getSegments().size());
final String key = "a";
+ /*
+ * Only 1 record returned. observed stream time = 60000, actual from =
59001 (60000 - 1000 + 1) and to = Long.MAX.
+ */
final List<KeyValue<Windowed<String>, Long>> expected = new
ArrayList<>();
- expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
- expected.add(new KeyValue<>(new Windowed<>(key, windows[2]), 100L));
expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 200L));
final List<KeyValue<Windowed<String>, Long>> results =
toList(bytesStore.all());
@@ -621,9 +602,10 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
assertEquals(2, bytesStore.getSegments().size());
final String key = "a";
+ /*
+ * Only 1 record returned. observed stream time = 60000, actual from =
59001 (60000 - 1000 + 1) and to = Long.MAX.
+ */
final List<KeyValue<Windowed<String>, Long>> expected = new
ArrayList<>();
- expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
- expected.add(new KeyValue<>(new Windowed<>(key, windows[2]), 100L));
expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 200L));
final List<KeyValue<Windowed<String>, Long>> results =
toList(bytesStore.all());
@@ -659,11 +641,20 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
// 1 segments are created during restoration.
assertEquals(1, bytesStore.getSegments().size());
final String key = "a";
- final List<KeyValue<Windowed<String>, Long>> expected = new
ArrayList<>();
- expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
+ /*
+ * For WindowKeySchema, the observedStreamTime is 1000 which means 1
extra record gets returned while for
+ * SessionKeySchema, it's 1500. Which changes the actual-from while
fetching. In case of SessionKeySchema, the
+ * fetch happens from 501 to end while for WindowKeySchema it's from 1
to end.
+ */
final List<KeyValue<Windowed<String>, Long>> results =
toList(bytesStore.all());
- assertEquals(expected, results);
+ if (schema instanceof SessionKeySchema) {
+ assertEquals(Collections.emptyList(), results);
+ } else {
+ final List<KeyValue<Windowed<String>, Long>> expected = new
ArrayList<>();
+ expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
+ assertEquals(expected, results);
+ }
assertThat(bytesStore.getPosition(), Matchers.notNullValue());
assertThat(bytesStore.getPosition().getPartitionPositions("A"),
hasEntry(0, 2L));
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
index efbcd8855a1..56c77ce4fe1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
@@ -45,6 +45,7 @@ import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
import java.util.ArrayList;
import java.util.Arrays;
@@ -58,6 +59,7 @@ import java.util.Properties;
import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.common.utils.Utils.toList;
import static org.apache.kafka.test.StreamsTestUtils.valuesToSet;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -913,7 +915,13 @@ public abstract class AbstractSessionBytesStoreTest {
try (final KeyValueIterator<Windowed<String>, Long> iterator =
sessionStore.findSessions("a", "b", 0L, Long.MAX_VALUE)
) {
- assertEquals(valuesToSet(iterator), new
HashSet<>(Arrays.asList(2L, 3L, 4L)));
+ if (getStoreType() == StoreType.InMemoryStore) {
+ assertEquals(valuesToSet(iterator), new
HashSet<>(Arrays.asList(2L, 3L, 4L)));
+ } else {
+ // The 2 records with values 2L and 3L are considered expired
as
+ // their end times < observed stream time - retentionPeriod +
1.
+ Assertions.assertEquals(valuesToSet(iterator), new
HashSet<>(Collections.singletonList(4L)));
+ }
}
}
@@ -934,4 +942,43 @@ public abstract class AbstractSessionBytesStoreTest {
final Position actual = sessionStore.getPosition();
assertThat(expected, is(actual));
}
+
+ @Test
+ public void shouldNotFetchExpiredSessions() {
+ final long systemTime = Time.SYSTEM.milliseconds();
+ sessionStore.put(new Windowed<>("p", new SessionWindow(systemTime - 3
* RETENTION_PERIOD, systemTime - 2 * RETENTION_PERIOD)), 1L);
+ sessionStore.put(new Windowed<>("q", new SessionWindow(systemTime - 2
* RETENTION_PERIOD, systemTime - RETENTION_PERIOD)), 4L);
+ sessionStore.put(new Windowed<>("r", new SessionWindow(systemTime -
RETENTION_PERIOD, systemTime - RETENTION_PERIOD / 2)), 3L);
+ sessionStore.put(new Windowed<>("p", new SessionWindow(systemTime -
RETENTION_PERIOD, systemTime - RETENTION_PERIOD / 2)), 2L);
+ try (final KeyValueIterator<Windowed<String>, Long> iterator =
+ sessionStore.findSessions("p", systemTime - 2 *
RETENTION_PERIOD, systemTime - RETENTION_PERIOD)
+ ) {
+ Assertions.assertEquals(mkSet(2L), valuesToSet(iterator));
+ }
+ try (final KeyValueIterator<Windowed<String>, Long> iterator =
+ sessionStore.backwardFindSessions("p", systemTime - 5 *
RETENTION_PERIOD, systemTime - 4 * RETENTION_PERIOD)
+ ) {
+ Assertions.assertFalse(iterator.hasNext());
+ }
+ try (final KeyValueIterator<Windowed<String>, Long> iterator =
+ sessionStore.findSessions("p", "r", systemTime - 5 *
RETENTION_PERIOD, systemTime - 4 * RETENTION_PERIOD)
+ ) {
+ Assertions.assertFalse(iterator.hasNext());
+ }
+ try (final KeyValueIterator<Windowed<String>, Long> iterator =
+ sessionStore.findSessions("p", "r", systemTime -
RETENTION_PERIOD, systemTime - RETENTION_PERIOD / 2)
+ ) {
+ Assertions.assertEquals(valuesToSet(iterator), mkSet(2L, 3L, 4L));
+ }
+ try (final KeyValueIterator<Windowed<String>, Long> iterator =
+ sessionStore.findSessions("p", "r", systemTime - 2 *
RETENTION_PERIOD, systemTime - RETENTION_PERIOD)
+ ) {
+ Assertions.assertEquals(valuesToSet(iterator), mkSet(2L, 3L, 4L));
+ }
+ try (final KeyValueIterator<Windowed<String>, Long> iterator =
+ sessionStore.backwardFindSessions("p", "r", systemTime -
2 * RETENTION_PERIOD, systemTime - RETENTION_PERIOD)
+ ) {
+ Assertions.assertEquals(valuesToSet(iterator), mkSet(2L, 3L, 4L));
+ }
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
index dbd46111fbb..d29c6bf88d4 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
@@ -342,7 +342,7 @@ public abstract class AbstractWindowBytesStoreTest {
);
assertEquals(
asList(zero, one, two, three),
- toList(windowStore.fetchAll(ofEpochMilli(defaultStartTime + 0),
ofEpochMilli(defaultStartTime + 3)))
+ toList(windowStore.fetchAll(ofEpochMilli(defaultStartTime),
ofEpochMilli(defaultStartTime + 3)))
);
assertEquals(
asList(one, two, three, four, five),
@@ -360,7 +360,7 @@ public abstract class AbstractWindowBytesStoreTest {
);
assertEquals(
asList(three, two, one, zero),
- toList(windowStore.backwardFetchAll(ofEpochMilli(defaultStartTime
+ 0), ofEpochMilli(defaultStartTime + 3)))
+
toList(windowStore.backwardFetchAll(ofEpochMilli(defaultStartTime),
ofEpochMilli(defaultStartTime + 3)))
);
assertEquals(
asList(five, four, three, two, one),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
index ead362a7b39..27fddc9b075 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
@@ -101,7 +101,8 @@ public class CachingPersistentWindowStoreTest {
@Before
public void setUp() {
keySchema = new WindowKeySchema();
- bytesStore = new RocksDBSegmentedBytesStore("test", "metrics-scope",
0, SEGMENT_INTERVAL, keySchema);
+ ///KAFKA-12960: Adding a retention of 100 ms to make all test cases
work as is.
+ bytesStore = new RocksDBSegmentedBytesStore("test", "metrics-scope",
100, SEGMENT_INTERVAL, keySchema);
underlyingStore = new RocksDBWindowStore(bytesStore, false,
WINDOW_SIZE);
final TimeWindowedDeserializer<String> keyDeserializer = new
TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE);
keyDeserializer.setIsChangelogTopic(true);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index 922608d4994..044f9484378 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -95,6 +95,7 @@ public class MeteredSessionStoreTest {
private static final byte[] VALUE_BYTES = VALUE.getBytes();
private static final long START_TIMESTAMP = 24L;
private static final long END_TIMESTAMP = 42L;
+ private static final int RETENTION_PERIOD = 100;
private final String threadId = Thread.currentThread().getName();
private final TaskId taskId = new TaskId(0, 0, "My-Topology");
@@ -429,6 +430,54 @@ public class MeteredSessionStoreTest {
verify(innerStore);
}
+ @Test
+ public void shouldReturnNoSessionsWhenFetchedKeyHasExpired() {
+ final long systemTime = Time.SYSTEM.milliseconds();
+ expect(innerStore.findSessions(KEY_BYTES, systemTime -
RETENTION_PERIOD, systemTime))
+ .andReturn(new
KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
+ init();
+
+ final KeyValueIterator<Windowed<String>, String> iterator =
store.findSessions(KEY, systemTime - RETENTION_PERIOD, systemTime);
+ assertFalse(iterator.hasNext());
+ iterator.close();
+ }
+
+ @Test
+ public void
shouldReturnNoSessionsInBackwardOrderWhenFetchedKeyHasExpired() {
+ final long systemTime = Time.SYSTEM.milliseconds();
+ expect(innerStore.backwardFindSessions(KEY_BYTES, systemTime -
RETENTION_PERIOD, systemTime))
+ .andReturn(new
KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
+ init();
+
+ final KeyValueIterator<Windowed<String>, String> iterator =
store.backwardFindSessions(KEY, systemTime - RETENTION_PERIOD, systemTime);
+ assertFalse(iterator.hasNext());
+ iterator.close();
+ }
+
+ @Test
+ public void shouldNotFindExpiredSessionRangeFromStore() {
+ final long systemTime = Time.SYSTEM.milliseconds();
+ expect(innerStore.findSessions(KEY_BYTES, KEY_BYTES, systemTime -
RETENTION_PERIOD, systemTime))
+ .andReturn(new
KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
+ init();
+
+ final KeyValueIterator<Windowed<String>, String> iterator =
store.findSessions(KEY, KEY, systemTime - RETENTION_PERIOD, systemTime);
+ assertFalse(iterator.hasNext());
+ iterator.close();
+ }
+
+ @Test
+ public void shouldNotFindExpiredSessionRangeInBackwardOrderFromStore() {
+ final long systemTime = Time.SYSTEM.milliseconds();
+ expect(innerStore.backwardFindSessions(KEY_BYTES, KEY_BYTES,
systemTime - RETENTION_PERIOD, systemTime))
+ .andReturn(new
KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
+ init();
+
+ final KeyValueIterator<Windowed<String>, String> iterator =
store.backwardFindSessions(KEY, KEY, systemTime - RETENTION_PERIOD, systemTime);
+ assertFalse(iterator.hasNext());
+ iterator.close();
+ }
+
@Test
public void shouldRecordRestoreTimeOnInit() {
init();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index ca6a518eb4c..20eb5ec88a1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -45,6 +45,7 @@ import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.Test;
+import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -82,6 +83,7 @@ public class MeteredWindowStoreTest {
private static final String VALUE = "value";
private static final byte[] VALUE_BYTES = VALUE.getBytes();
private static final int WINDOW_SIZE_MS = 10;
+ private static final int RETENTION_PERIOD = 100;
private static final long TIMESTAMP = 42L;
private final String threadId = Thread.currentThread().getName();
@@ -270,6 +272,18 @@ public class MeteredWindowStoreTest {
verify(innerStoreMock);
}
+ @Test
+ public void shouldReturnNoRecordWhenFetchedKeyHasExpired() {
+ expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1 +
RETENTION_PERIOD))
+ .andReturn(KeyValueIterators.emptyWindowStoreIterator());
+ replay(innerStoreMock);
+
+ store.init((StateStoreContext) context, store);
+ store.fetch("a", ofEpochMilli(1),
ofEpochMilli(1).plus(RETENTION_PERIOD, ChronoUnit.MILLIS)).close(); // recorded
on close;
+
+ verify(innerStoreMock);
+ }
+
@Test
public void shouldFetchRangeFromInnerStoreAndRecordFetchMetrics() {
expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()),
Bytes.wrap("b".getBytes()), 1, 1))
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index c0c7e963e6e..d2f5289a1ad 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -187,19 +187,40 @@ public class RocksDBWindowStoreTest extends
AbstractWindowBytesStoreTest {
),
segmentDirs(baseDir)
);
-
+ // For all tests, for WindowStore actualFrom is computed using
observedStreamTime - retention + 1.
+ // while for TimeOrderedWindowStores, actualFrom = observedStreamTime
- retention
+ // expired record
assertEquals(
- new HashSet<>(Collections.singletonList("zero")),
+ new HashSet<>(Collections.emptyList()),
valuesToSet(windowStore.fetch(
0,
ofEpochMilli(startTime - WINDOW_SIZE),
ofEpochMilli(startTime + WINDOW_SIZE))));
- assertEquals(
- new HashSet<>(Collections.singletonList("one")),
- valuesToSet(windowStore.fetch(
- 1,
- ofEpochMilli(startTime + increment - WINDOW_SIZE),
- ofEpochMilli(startTime + increment + WINDOW_SIZE))));
+ // RocksDbWindwStore =>
+ // from = 149997
+ // to = 150003
+ // actualFrom = 150001
+ // record one timestamp is 150,000 So, it's ignored.
+ // RocksDBTimeOrderedWindowStore*Index =>
+ // from = 149997
+ // to = 150003
+ // actualFrom = 150000, hence not ignored
+ if (storeType == StoreType.RocksDBWindowStore) {
+ assertEquals(
+ new HashSet<>(Collections.emptyList()),
+ valuesToSet(windowStore.fetch(
+ 1,
+ ofEpochMilli(startTime + increment - WINDOW_SIZE),
+ ofEpochMilli(startTime + increment + WINDOW_SIZE))));
+
+ } else {
+ assertEquals(
+ new HashSet<>(Collections.singletonList("one")),
+ valuesToSet(windowStore.fetch(
+ 1,
+ ofEpochMilli(startTime + increment - WINDOW_SIZE),
+ ofEpochMilli(startTime + increment + WINDOW_SIZE))));
+ }
assertEquals(
new HashSet<>(Collections.singletonList("two")),
valuesToSet(windowStore.fetch(
@@ -247,12 +268,32 @@ public class RocksDBWindowStoreTest extends
AbstractWindowBytesStoreTest {
1,
ofEpochMilli(startTime + increment - WINDOW_SIZE),
ofEpochMilli(startTime + increment + WINDOW_SIZE))));
- assertEquals(
- new HashSet<>(Collections.singletonList("two")),
- valuesToSet(windowStore.fetch(
- 2,
- ofEpochMilli(startTime + increment * 2 - WINDOW_SIZE),
- ofEpochMilli(startTime + increment * 2 + WINDOW_SIZE))));
+ // RocksDbWindwStore =>
+ // from = 179997
+ // to = 180003
+ // actualFrom = 170001
+ // record one timestamp is 180,000 So, it's ignored.
+ // RocksDBTimeOrderedWindowStore*Index =>
+ // from = 179997
+ // to = 180003
+ // actualFrom = 180000, hence not ignored
+ if (storeType == StoreType.RocksDBWindowStore) {
+ assertEquals(
+ // expired record
+ new HashSet<>(Collections.emptyList()),
+ valuesToSet(windowStore.fetch(
+ 2,
+ ofEpochMilli(startTime + increment * 2 -
WINDOW_SIZE),
+ ofEpochMilli(startTime + increment * 2 +
WINDOW_SIZE))));
+ } else {
+ assertEquals(
+ // expired record
+ new HashSet<>(Collections.singletonList("two")),
+ valuesToSet(windowStore.fetch(
+ 2,
+ ofEpochMilli(startTime + increment * 2 -
WINDOW_SIZE),
+ ofEpochMilli(startTime + increment * 2 +
WINDOW_SIZE))));
+ }
assertEquals(
new HashSet<>(Collections.emptyList()),
valuesToSet(windowStore.fetch(
@@ -301,7 +342,8 @@ public class RocksDBWindowStoreTest extends
AbstractWindowBytesStoreTest {
ofEpochMilli(startTime + increment - WINDOW_SIZE),
ofEpochMilli(startTime + increment + WINDOW_SIZE))));
assertEquals(
- new HashSet<>(Collections.singletonList("two")),
+ // expired record
+ new HashSet<>(Collections.emptyList()),
valuesToSet(windowStore.fetch(
2,
ofEpochMilli(startTime + increment * 2 - WINDOW_SIZE),
@@ -371,12 +413,24 @@ public class RocksDBWindowStoreTest extends
AbstractWindowBytesStoreTest {
3,
ofEpochMilli(startTime + increment * 3 - WINDOW_SIZE),
ofEpochMilli(startTime + increment * 3 + WINDOW_SIZE))));
- assertEquals(
- new HashSet<>(Collections.singletonList("four")),
- valuesToSet(windowStore.fetch(
- 4,
- ofEpochMilli(startTime + increment * 4 - WINDOW_SIZE),
- ofEpochMilli(startTime + increment * 4 + WINDOW_SIZE))));
+ if (storeType == StoreType.RocksDBWindowStore) {
+ assertEquals(
+ // expired record
+ new HashSet<>(Collections.emptyList()),
+ valuesToSet(windowStore.fetch(
+ 4,
+ ofEpochMilli(startTime + increment * 4 -
WINDOW_SIZE),
+ ofEpochMilli(startTime + increment * 4 +
WINDOW_SIZE))));
+ } else {
+ assertEquals(
+ // expired record
+ new HashSet<>(Collections.singletonList("four")),
+ valuesToSet(windowStore.fetch(
+ 4,
+ ofEpochMilli(startTime + increment * 4 -
WINDOW_SIZE),
+ ofEpochMilli(startTime + increment * 4 +
WINDOW_SIZE))));
+
+ }
assertEquals(
new HashSet<>(Collections.singletonList("five")),
valuesToSet(windowStore.fetch(
@@ -465,7 +519,14 @@ public class RocksDBWindowStoreTest extends
AbstractWindowBytesStoreTest {
iter.next();
fetchedCount++;
}
- assertEquals(2, fetchedCount);
+ // 1 extra record is expired in the case of RocksDBWindowStore as
+ // actualFrom = observedStreamTime - retentionPeriod + 1. The +1
+ // isn't present for RocksDbTimeOrderedStoreWith*Index
+ if (storeType == StoreType.RocksDBWindowStore) {
+ assertEquals(1, fetchedCount);
+ } else {
+ assertEquals(2, fetchedCount);
+ }
assertEquals(
Utils.mkSet(segments.segmentName(1L), segments.segmentName(3L)),
@@ -480,8 +541,10 @@ public class RocksDBWindowStoreTest extends
AbstractWindowBytesStoreTest {
iter.next();
fetchedCount++;
}
- assertEquals(1, fetchedCount);
+ // the latest record has a timestamp > 60k. So, the +1 in actualFrom
calculation in
+ // RocksDbWindowStore shouldn't have an implciation and all stores
should return the same fetched counts.
+ assertEquals(1, fetchedCount);
assertEquals(
Utils.mkSet(segments.segmentName(3L), segments.segmentName(5L)),
segmentDirs(baseDir)
@@ -564,6 +627,9 @@ public class RocksDBWindowStoreTest extends
AbstractWindowBytesStoreTest {
Serdes.String());
windowStore.init((StateStoreContext) context, windowStore);
+ // For all tests, for WindowStore actualFrom is computed using
observedStreamTime - retention + 1.
+ // while for TimeOrderedWindowStores, actualFrom = observedStreamTime
- retention
+
assertEquals(
new HashSet<>(Collections.emptyList()),
valuesToSet(windowStore.fetch(
@@ -650,12 +716,31 @@ public class RocksDBWindowStoreTest extends
AbstractWindowBytesStoreTest {
3,
ofEpochMilli(startTime + increment * 3 - WINDOW_SIZE),
ofEpochMilli(startTime + increment * 3 + WINDOW_SIZE))));
- assertEquals(
- new HashSet<>(Collections.singletonList("four")),
- valuesToSet(windowStore.fetch(
- 4,
- ofEpochMilli(startTime + increment * 4 - WINDOW_SIZE),
- ofEpochMilli(startTime + increment * 4 + WINDOW_SIZE))));
+ // RocksDbWindwStore =>
+ // from = 239,997
+ // to = 240,003
+ // actualFrom = 240,001
+ // record four timestamp is 240,000 So, it's ignored.
+ // RocksDBTimeOrderedWindowStore*Index =>
+ // from = 239,997
+ // to = 240,003
+ // actualFrom = 240,000, hence not ignored
+ if (storeType == StoreType.RocksDBWindowStore) {
+ assertEquals(
+ new HashSet<>(Collections.emptyList()),
+ valuesToSet(windowStore.fetch(
+ 4,
+ ofEpochMilli(startTime + increment * 4 -
WINDOW_SIZE),
+ ofEpochMilli(startTime + increment * 4 +
WINDOW_SIZE))));
+ } else {
+ assertEquals(
+ new HashSet<>(Collections.singletonList("four")),
+ valuesToSet(windowStore.fetch(
+ 4,
+ ofEpochMilli(startTime + increment * 4 -
WINDOW_SIZE),
+ ofEpochMilli(startTime + increment * 4 +
WINDOW_SIZE))));
+
+ }
assertEquals(
new HashSet<>(Collections.singletonList("five")),
valuesToSet(windowStore.fetch(