This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b5d4fa7645e KAFKA-13785: [10/N][emit final] more unit test for session 
store and disable cache for emit final sliding window (#12370)
b5d4fa7645e is described below

commit b5d4fa7645eb75d2030eb8cac78545a681686a39
Author: Hao Li <1127478+lihao...@users.noreply.github.com>
AuthorDate: Tue Jul 12 10:57:11 2022 -0700

    KAFKA-13785: [10/N][emit final] more unit test for session store and 
disable cache for emit final sliding window (#12370)
    
    1. Added more unit test for RocksDBTimeOrderedSessionStore and 
RocksDBTimeOrderedSessionSegmentedBytesStore
    2. Disable cache for sliding window if emit strategy is ON_WINDOW_CLOSE
    
    Reviewers: Matthias J. Sax <matth...@confluent.io>, Guozhang Wang 
<wangg...@gmail.com>
---
 .../internals/SessionWindowedKStreamImpl.java      |   3 +
 .../internals/SlidingWindowedKStreamImpl.java      |   4 +-
 .../state/internals/PrefixedSessionKeySchemas.java |  14 +--
 ...cksDBTimeOrderedSessionSegmentedBytesStore.java |   8 +-
 .../internals/RocksDBTimeOrderedSessionStore.java  |   8 +-
 ...ctDualSchemaRocksDBSegmentedBytesStoreTest.java | 138 +++++++++++++++++++++
 .../internals/AbstractSessionBytesStoreTest.java   | 124 ++++++++++++++++++
 .../state/internals/InMemorySessionStoreTest.java  |  41 +-----
 .../state/internals/RocksDBSessionStoreTest.java   |  57 +--------
 9 files changed, 291 insertions(+), 106 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
index c3b05cb1182..8c60019fccb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
@@ -289,7 +289,10 @@ public class SessionWindowedKStreamImpl<K, V> extends 
AbstractStream<K, V> imple
         // do not enable cache if the emit final strategy is used
         if (materialized.cachingEnabled() && emitStrategy.type() != 
EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
             builder.withCachingEnabled();
+        } else {
+            builder.withCachingDisabled();
         }
+
         return builder;
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
index 70c75b4c82e..5ca6b911b7c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
@@ -258,7 +258,9 @@ public class SlidingWindowedKStreamImpl<K, V> extends 
AbstractStream<K, V> imple
         } else {
             builder.withLoggingDisabled();
         }
-        if (materialized.cachingEnabled()) {
+
+        // do not enable cache if the emit final strategy is used
+        if (materialized.cachingEnabled() && emitStrategy.type() != 
StrategyType.ON_WINDOW_CLOSE) {
             builder.withCachingEnabled();
         } else {
             builder.withCachingDisabled();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java
index 2ac25277ba8..3ce00bcb8a9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java
@@ -102,8 +102,8 @@ public class PrefixedSessionKeySchemas {
         @Override
         public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom,
                                                  final Bytes binaryKeyTo,
-                                                 final long from,
-                                                 final long to,
+                                                 final long 
earliestWindowEndTime,
+                                                 final long 
latestWindowStartTime,
                                                  final boolean forward) {
             return iterator -> {
                 while (iterator.hasNext()) {
@@ -120,13 +120,13 @@ public class PrefixedSessionKeySchemas {
 
                     // We can return false directly here since keys are sorted 
by end time and if
                     // we get time smaller than `from`, there won't be time 
within range.
-                    if (!forward && endTime < from) {
+                    if (!forward && endTime < earliestWindowEndTime) {
                         return false;
                     }
 
                     if ((binaryKeyFrom == null || 
windowedKey.key().compareTo(binaryKeyFrom) >= 0)
                         && (binaryKeyTo == null || 
windowedKey.key().compareTo(binaryKeyTo) <= 0)
-                        && endTime >= from && startTime <= to) {
+                        && endTime >= earliestWindowEndTime && startTime <= 
latestWindowStartTime) {
                         return true;
                     }
                     iterator.next();
@@ -137,10 +137,10 @@ public class PrefixedSessionKeySchemas {
 
         @Override
         public <S extends Segment> List<S> segmentsToSearch(final Segments<S> 
segments,
-                                                            final long from,
-                                                            final long to,
+                                                            final long 
earliestWindowEndTime,
+                                                            final long 
latestWindowStartTime,
                                                             final boolean 
forward) {
-            return segments.segments(from, Long.MAX_VALUE, forward);
+            return segments.segments(earliestWindowEndTime, Long.MAX_VALUE, 
forward);
         }
 
         static long extractStartTimestamp(final byte[] binaryKey) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java
index 172d3218818..59e255443c0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java
@@ -63,12 +63,12 @@ public class RocksDBTimeOrderedSessionSegmentedBytesStore 
extends AbstractRocksD
     }
 
     public byte[] fetchSession(final Bytes key,
-                               final long earliestSessionEndTime,
-                               final long latestSessionStartTime) {
+                               final long sessionStartTime,
+                               final long sessionEndTime) {
         return get(TimeFirstSessionKeySchema.toBinary(
             key,
-            earliestSessionEndTime,
-            latestSessionStartTime
+            sessionStartTime,
+            sessionEndTime
         ));
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java
index deb6028ef68..62a874f06c6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java
@@ -122,12 +122,12 @@ public class RocksDBTimeOrderedSessionStore
 
     @Override
     public byte[] fetchSession(final Bytes key,
-                               final long earliestSessionEndTime,
-                               final long latestSessionStartTime) {
+                               final long sessionStartTime,
+                               final long sessiontEndTime) {
         return wrapped().fetchSession(
             key,
-            earliestSessionEndTime,
-            latestSessionStartTime
+            sessionStartTime,
+            sessiontEndTime
         );
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
index 0641392b2a3..3644e8eaa6d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
@@ -91,6 +91,7 @@ import static org.hamcrest.Matchers.hasEntry;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
@@ -827,6 +828,143 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
         }
     }
 
+    @Test
+    public void shouldFetchSessionForSingleKey() {
+        // Only for TimeFirstSessionKeySchema schema
+        if (!(getBaseSchema() instanceof TimeFirstSessionKeySchema)) {
+            return;
+        }
+
+        final String keyA = "a";
+        final String keyB = "b";
+        final String keyC = "c";
+
+        final StateSerdes<String, Long> stateSerdes = 
StateSerdes.withBuiltinTypes("dummy", String.class, Long.class);
+        final Bytes key1 = 
Bytes.wrap(stateSerdes.keySerializer().serialize("dummy", keyA));
+        final Bytes key2 = 
Bytes.wrap(stateSerdes.keySerializer().serialize("dummy", keyB));
+        final Bytes key3 = 
Bytes.wrap(stateSerdes.keySerializer().serialize("dummy", keyC));
+
+        final byte[] expectedValue1 = serializeValue(10);
+        final byte[] expectedValue2 = serializeValue(50);
+        final byte[] expectedValue3 = serializeValue(100);
+        final byte[] expectedValue4 = serializeValue(200);
+
+        bytesStore.put(serializeKey(new Windowed<>(keyA, windows[0])), 
expectedValue1);
+        bytesStore.put(serializeKey(new Windowed<>(keyA, windows[1])), 
expectedValue2);
+        bytesStore.put(serializeKey(new Windowed<>(keyB, windows[2])), 
expectedValue3);
+        bytesStore.put(serializeKey(new Windowed<>(keyC, windows[3])), 
expectedValue4);
+
+        final byte[] value1 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) 
bytesStore).fetchSession(
+            key1, windows[0].start(), windows[0].end());
+        assertEquals(Bytes.wrap(value1), Bytes.wrap(expectedValue1));
+
+        final byte[] value2 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) 
bytesStore).fetchSession(
+            key1, windows[1].start(), windows[1].end());
+        assertEquals(Bytes.wrap(value2), Bytes.wrap(expectedValue2));
+
+        final byte[] value3 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) 
bytesStore).fetchSession(
+            key2, windows[2].start(), windows[2].end());
+        assertEquals(Bytes.wrap(value3), Bytes.wrap(expectedValue3));
+
+        final byte[] value4 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) 
bytesStore).fetchSession(
+            key3, windows[3].start(), windows[3].end());
+        assertEquals(Bytes.wrap(value4), Bytes.wrap(expectedValue4));
+
+        final byte[] noValue = ((RocksDBTimeOrderedSessionSegmentedBytesStore) 
bytesStore).fetchSession(
+            key3, 2000, 3000);
+        assertNull(noValue);
+    }
+
+    @Test
+    public void shouldFetchSessionForTimeRange() {
+        // Only for TimeFirstSessionKeySchema schema
+        if (!(getBaseSchema() instanceof TimeFirstSessionKeySchema)) {
+            return;
+        }
+        final String keyA = "a";
+        final String keyB = "b";
+        final String keyC = "c";
+
+        final Window[] sessionWindows = new Window[4];
+        sessionWindows[0] = new SessionWindow(100L, 100L);
+        sessionWindows[1] = new SessionWindow(50L, 200L);
+        sessionWindows[2] = new SessionWindow(200L, 300L);
+        bytesStore.put(serializeKey(new Windowed<>(keyA, sessionWindows[0])), 
serializeValue(10));
+        bytesStore.put(serializeKey(new Windowed<>(keyB, sessionWindows[1])), 
serializeValue(100));
+        bytesStore.put(serializeKey(new Windowed<>(keyC, sessionWindows[2])), 
serializeValue(200));
+
+
+        // Fetch point
+        try (final KeyValueIterator<Bytes, byte[]> values = 
((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSessions(100L, 
100L)) {
+
+            final List<KeyValue<Windowed<String>, Long>> expected = 
Collections.singletonList(
+                KeyValue.pair(new Windowed<>(keyA, sessionWindows[0]), 10L)
+            );
+
+            assertEquals(expected, toList(values));
+        }
+
+        // Fetch partial boundary
+        try (final KeyValueIterator<Bytes, byte[]> values = 
((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSessions(100L, 
200L)) {
+
+            final List<KeyValue<Windowed<String>, Long>> expected = asList(
+                KeyValue.pair(new Windowed<>(keyA, sessionWindows[0]), 10L),
+                KeyValue.pair(new Windowed<>(keyB, sessionWindows[1]), 100L)
+            );
+
+            assertEquals(expected, toList(values));
+        }
+
+        // Fetch partial
+        try (final KeyValueIterator<Bytes, byte[]> values = 
((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSessions(99L, 
201L)) {
+            final List<KeyValue<Windowed<String>, Long>> expected = asList(
+                KeyValue.pair(new Windowed<>(keyA, sessionWindows[0]), 10L),
+                KeyValue.pair(new Windowed<>(keyB, sessionWindows[1]), 100L)
+            );
+
+            assertEquals(expected, toList(values));
+        }
+
+        // Fetch partial
+        try (final KeyValueIterator<Bytes, byte[]> values = 
((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSessions(101L, 
199L)) {
+            assertTrue(toList(values).isEmpty());
+        }
+
+        // Fetch all boundary
+        try (final KeyValueIterator<Bytes, byte[]> values = 
((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSessions(100L, 
300L)) {
+
+            final List<KeyValue<Windowed<String>, Long>> expected = asList(
+                KeyValue.pair(new Windowed<>(keyA, sessionWindows[0]), 10L),
+                KeyValue.pair(new Windowed<>(keyB, sessionWindows[1]), 100L),
+                KeyValue.pair(new Windowed<>(keyC, sessionWindows[2]), 200L)
+            );
+
+            assertEquals(expected, toList(values));
+        }
+
+        // Fetch all
+        try (final KeyValueIterator<Bytes, byte[]> values = 
((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSessions(99L, 
301L)) {
+
+            final List<KeyValue<Windowed<String>, Long>> expected = asList(
+                KeyValue.pair(new Windowed<>(keyA, sessionWindows[0]), 10L),
+                KeyValue.pair(new Windowed<>(keyB, sessionWindows[1]), 100L),
+                KeyValue.pair(new Windowed<>(keyC, sessionWindows[2]), 200L)
+            );
+
+            assertEquals(expected, toList(values));
+        }
+
+        // Fetch all
+        try (final KeyValueIterator<Bytes, byte[]> values = 
((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSessions(101L, 
299L)) {
+
+            final List<KeyValue<Windowed<String>, Long>> expected = 
Collections.singletonList(
+                KeyValue.pair(new Windowed<>(keyB, sessionWindows[1]), 100L)
+            );
+
+            assertEquals(expected, toList(values));
+        }
+    }
+
     @Test
     public void shouldSkipAndRemoveDanglingIndex() {
         final String keyA = "a";
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
index 6e93f6a7ba1..78d7f08ee84 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
@@ -32,7 +33,9 @@ import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import 
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.test.InternalMockProcessorContext;
@@ -60,6 +63,7 @@ import static 
org.apache.kafka.test.StreamsTestUtils.valuesToSet;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -73,6 +77,13 @@ public abstract class AbstractSessionBytesStoreTest {
     static final long SEGMENT_INTERVAL = 60_000L;
     static final long RETENTION_PERIOD = 10_000L;
 
+    enum StoreType {
+        RocksDBSessionStore,
+        RocksDBTimeOrderedSessionStoreWithIndex,
+        RocksDBTimeOrderedSessionStoreWithoutIndex,
+        InMemoryStore
+    }
+
     SessionStore<String, Long> sessionStore;
 
     private MockRecordCollector recordCollector;
@@ -83,6 +94,8 @@ public abstract class AbstractSessionBytesStoreTest {
                                                          final Serde<K> 
keySerde,
                                                          final Serde<V> 
valueSerde);
 
+    abstract StoreType getStoreType();
+
     @Before
     public void setUp() {
         sessionStore = buildSessionStore(RETENTION_PERIOD, Serdes.String(), 
Serdes.Long());
@@ -179,6 +192,75 @@ public abstract class AbstractSessionBytesStoreTest {
         }
     }
 
+    @Test
+    public void shouldFindSessionsForTimeRange() {
+        sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 5L);
+
+        if (getStoreType() == StoreType.RocksDBSessionStore) {
+            assertThrows(
+                "This API is not supported by this implementation of 
SessionStore.",
+                UnsupportedOperationException.class,
+                () -> sessionStore.findSessions(0, 0)
+            );
+            return;
+        }
+
+        // Find point
+        try (final KeyValueIterator<Windowed<String>, Long> values = 
sessionStore.findSessions(0, 0)) {
+            final List<KeyValue<Windowed<String>, Long>> expected = 
Collections.singletonList(
+                KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 5L)
+            );
+            assertEquals(expected, toList(values));
+        }
+
+        sessionStore.put(new Windowed<>("b", new SessionWindow(10, 20)), 10L);
+        sessionStore.put(new Windowed<>("c", new SessionWindow(30, 40)), 20L);
+
+        // Find boundary
+        try (final KeyValueIterator<Windowed<String>, Long> values = 
sessionStore.findSessions(0, 20)) {
+            final List<KeyValue<Windowed<String>, Long>> expected = asList(
+                KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 
5L),
+                KeyValue.pair(new Windowed<>("b", new SessionWindow(10, 20)), 
10L)
+            );
+            assertEquals(expected, toList(values));
+        }
+
+        // Find left boundary
+        try (final KeyValueIterator<Windowed<String>, Long> values = 
sessionStore.findSessions(0, 19)) {
+            final List<KeyValue<Windowed<String>, Long>> expected = 
Collections.singletonList(
+                KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 5L)
+            );
+            assertEquals(expected, toList(values));
+        }
+
+        // Find right boundary
+        try (final KeyValueIterator<Windowed<String>, Long> values = 
sessionStore.findSessions(1, 20)) {
+            final List<KeyValue<Windowed<String>, Long>> expected = 
Collections.singletonList(
+                KeyValue.pair(new Windowed<>("b", new SessionWindow(10, 20)), 
10L)
+            );
+            assertEquals(expected, toList(values));
+        }
+
+        // Find partial off by 1
+        try (final KeyValueIterator<Windowed<String>, Long> values = 
sessionStore.findSessions(19, 41)) {
+            final List<KeyValue<Windowed<String>, Long>> expected = asList(
+                KeyValue.pair(new Windowed<>("b", new SessionWindow(10, 20)), 
10L),
+                KeyValue.pair(new Windowed<>("c", new SessionWindow(30, 40)), 
20L)
+            );
+            assertEquals(expected, toList(values));
+        }
+
+        // Find all boundary
+        try (final KeyValueIterator<Windowed<String>, Long> values = 
sessionStore.findSessions(0, 40)) {
+            final List<KeyValue<Windowed<String>, Long>> expected = asList(
+                KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 
5L),
+                KeyValue.pair(new Windowed<>("b", new SessionWindow(10, 20)), 
10L),
+                KeyValue.pair(new Windowed<>("c", new SessionWindow(30, 40)), 
20L)
+            );
+            assertEquals(expected, toList(values));
+        }
+    }
+
     @Test
     public void shouldBackwardFetchAllSessionsWithSameRecordKey() {
         final LinkedList<KeyValue<Windowed<String>, Long>> expected = new 
LinkedList<>();
@@ -810,4 +892,46 @@ public abstract class AbstractSessionBytesStoreTest {
             );
         }
     }
+
+    @Test
+    public void shouldRemoveExpired() {
+        sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
+        if (getStoreType() == StoreType.InMemoryStore) {
+            sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 10)), 
2L);
+            sessionStore.put(new Windowed<>("a", new SessionWindow(10, 20)), 
3L);
+
+            // Advance stream time to expire the first record
+            sessionStore.put(new Windowed<>("aa", new SessionWindow(10, 
RETENTION_PERIOD)), 4L);
+        } else {
+            sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 
SEGMENT_INTERVAL)), 2L);
+            sessionStore.put(new Windowed<>("a", new SessionWindow(10, 
SEGMENT_INTERVAL)), 3L);
+
+            // Advance stream time to expire the first record
+            sessionStore.put(new Windowed<>("aa", new SessionWindow(10, 2 * 
SEGMENT_INTERVAL)), 4L);
+        }
+
+        try (final KeyValueIterator<Windowed<String>, Long> iterator =
+            sessionStore.findSessions("a", "b", 0L, Long.MAX_VALUE)
+        ) {
+            assertEquals(valuesToSet(iterator), new 
HashSet<>(Arrays.asList(2L, 3L, 4L)));
+        }
+    }
+
+    @Test
+    public void shouldMatchPositionAfterPut() {
+        final MeteredSessionStore<String, Long> meteredSessionStore = 
(MeteredSessionStore<String, Long>) sessionStore;
+        final ChangeLoggingSessionBytesStore changeLoggingSessionBytesStore = 
(ChangeLoggingSessionBytesStore) meteredSessionStore.wrapped();
+        final SessionStore wrapped = (SessionStore) 
changeLoggingSessionBytesStore.wrapped();
+
+        context.setRecordContext(new ProcessorRecordContext(0, 1, 0, "", new 
RecordHeaders()));
+        sessionStore.put(new Windowed<String>("a", new SessionWindow(0, 0)), 
1L);
+        context.setRecordContext(new ProcessorRecordContext(0, 2, 0, "", new 
RecordHeaders()));
+        sessionStore.put(new Windowed<String>("aa", new SessionWindow(0, 10)), 
2L);
+        context.setRecordContext(new ProcessorRecordContext(0, 3, 0, "", new 
RecordHeaders()));
+        sessionStore.put(new Windowed<String>("a", new SessionWindow(10, 20)), 
3L);
+
+        final Position expected = Position.fromMap(mkMap(mkEntry("", 
mkMap(mkEntry(0, 3L)))));
+        final Position actual = sessionStore.getPosition();
+        assertThat(expected, is(actual));
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java
index 7821e2c0216..8546c546716 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java
@@ -16,12 +16,9 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
-import org.apache.kafka.streams.query.Position;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.Stores;
@@ -31,13 +28,9 @@ import java.util.Arrays;
 import java.util.HashSet;
 
 import static java.time.Duration.ofMillis;
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.test.StreamsTestUtils.valuesToSet;
-import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.hamcrest.Matchers.is;
 
 public class InMemorySessionStoreTest extends AbstractSessionBytesStoreTest {
 
@@ -55,20 +48,8 @@ public class InMemorySessionStoreTest extends 
AbstractSessionBytesStoreTest {
             valueSerde).build();
     }
 
-    @Test
-    public void shouldRemoveExpired() {
-        sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
-        sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 10)), 2L);
-        sessionStore.put(new Windowed<>("a", new SessionWindow(10, 20)), 3L);
-
-        // Advance stream time to expire the first record
-        sessionStore.put(new Windowed<>("aa", new SessionWindow(10, 
RETENTION_PERIOD)), 4L);
-
-        try (final KeyValueIterator<Windowed<String>, Long> iterator =
-            sessionStore.findSessions("a", "b", 0L, Long.MAX_VALUE)
-        ) {
-            assertEquals(valuesToSet(iterator), new 
HashSet<>(Arrays.asList(2L, 3L, 4L)));
-        }
+    StoreType getStoreType() {
+        return StoreType.InMemoryStore;
     }
 
     @Test
@@ -90,22 +71,4 @@ public class InMemorySessionStoreTest extends 
AbstractSessionBytesStoreTest {
         assertFalse(sessionStore.findSessions("a", "b", 0L, 20L).hasNext());
     }
 
-    @Test
-    public void shouldMatchPositionAfterPut() {
-        final MeteredSessionStore<String, Long> meteredSessionStore = 
(MeteredSessionStore<String, Long>) sessionStore;
-        final ChangeLoggingSessionBytesStore changeLoggingSessionBytesStore = 
(ChangeLoggingSessionBytesStore) meteredSessionStore.wrapped();
-        final InMemorySessionStore inMemorySessionStore = 
(InMemorySessionStore) changeLoggingSessionBytesStore.wrapped();
-
-        context.setRecordContext(new ProcessorRecordContext(0, 1, 0, "", new 
RecordHeaders()));
-        sessionStore.put(new Windowed<String>("a", new SessionWindow(0, 0)), 
1L);
-        context.setRecordContext(new ProcessorRecordContext(0, 2, 0, "", new 
RecordHeaders()));
-        sessionStore.put(new Windowed<String>("aa", new SessionWindow(0, 10)), 
2L);
-        context.setRecordContext(new ProcessorRecordContext(0, 3, 0, "", new 
RecordHeaders()));
-        sessionStore.put(new Windowed<String>("a", new SessionWindow(10, 20)), 
3L);
-
-        final Position expected = Position.fromMap(mkMap(mkEntry("", 
mkMap(mkEntry(0, 3L)))));
-        final Position actual = inMemorySessionStore.getPosition();
-        assertThat(expected, is(actual));
-    }
-
 }
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index b3a749a8a37..8a849d86bcb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -17,44 +17,27 @@
 package org.apache.kafka.streams.state.internals;
 
 import java.util.Collection;
-import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.internals.SessionWindow;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
-import org.apache.kafka.streams.query.Position;
-import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.Stores;
-import org.junit.Test;
 
-import java.util.HashSet;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 
 import static java.time.Duration.ofMillis;
 import static java.util.Arrays.asList;
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.apache.kafka.test.StreamsTestUtils.valuesToSet;
-import static org.junit.Assert.assertEquals;
 
 @RunWith(Parameterized.class)
 public class RocksDBSessionStoreTest extends AbstractSessionBytesStoreTest {
 
     private static final String STORE_NAME = "rocksDB session store";
 
-    enum StoreType {
-        RocksDBSessionStore,
-        RocksDBTimeOrderedSessionStoreWithIndex,
-        RocksDBTimeOrderedSessionStoreWithoutIndex
-    }
     @Parameter
     public StoreType storeType;
 
     @Parameterized.Parameters(name = "{0}")
-    public static Collection<Object[]> getKeySchema() {
+    public static Collection<Object[]> getParamStoreType() {
         return asList(new Object[][] {
             {StoreType.RocksDBSessionStore},
             {StoreType.RocksDBTimeOrderedSessionStoreWithIndex},
@@ -62,6 +45,11 @@ public class RocksDBSessionStoreTest extends 
AbstractSessionBytesStoreTest {
         });
     }
 
+    @Override
+    StoreType getStoreType() {
+        return storeType;
+    }
+
     @Override
     <K, V> SessionStore<K, V> buildSessionStore(final long retentionPeriod,
                                                  final Serde<K> keySerde,
@@ -102,37 +90,4 @@ public class RocksDBSessionStoreTest extends 
AbstractSessionBytesStoreTest {
         }
     }
 
-    @Test
-    public void shouldRemoveExpired() {
-        sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
-        sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 
SEGMENT_INTERVAL)), 2L);
-        sessionStore.put(new Windowed<>("a", new SessionWindow(10, 
SEGMENT_INTERVAL)), 3L);
-
-        // Advance stream time to expire the first record
-        sessionStore.put(new Windowed<>("aa", new SessionWindow(10, 2 * 
SEGMENT_INTERVAL)), 4L);
-
-        try (final KeyValueIterator<Windowed<String>, Long> iterator =
-            sessionStore.findSessions("a", "b", 0L, Long.MAX_VALUE)
-        ) {
-            assertEquals(valuesToSet(iterator), new HashSet<>(asList(2L, 3L, 
4L)));
-        }
-    }
-
-    @Test
-    public void shouldMatchPositionAfterPut() {
-        final MeteredSessionStore<String, Long> meteredSessionStore = 
(MeteredSessionStore<String, Long>) sessionStore;
-        final ChangeLoggingSessionBytesStore changeLoggingSessionBytesStore = 
(ChangeLoggingSessionBytesStore) meteredSessionStore.wrapped();
-        final WrappedStateStore rocksDBSessionStore = (WrappedStateStore) 
changeLoggingSessionBytesStore.wrapped();
-
-        context.setRecordContext(new ProcessorRecordContext(0, 1, 0, "", new 
RecordHeaders()));
-        sessionStore.put(new Windowed<String>("a", new SessionWindow(0, 0)), 
1L);
-        context.setRecordContext(new ProcessorRecordContext(0, 2, 0, "", new 
RecordHeaders()));
-        sessionStore.put(new Windowed<String>("aa", new SessionWindow(0, 
SEGMENT_INTERVAL)), 2L);
-        context.setRecordContext(new ProcessorRecordContext(0, 3, 0, "", new 
RecordHeaders()));
-        sessionStore.put(new Windowed<String>("a", new SessionWindow(10, 
SEGMENT_INTERVAL)), 3L);
-
-        final Position expected = Position.fromMap(mkMap(mkEntry("", 
mkMap(mkEntry(0, 3L)))));
-        final Position actual = rocksDBSessionStore.getPosition();
-        assertEquals(expected, actual);
-    }
 }
\ No newline at end of file

Reply via email to