This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 814fbe0 MINOR: Remove 1 minute minimum segment interval (#5323)
814fbe0 is described below
commit 814fbe0feabea0f78b690f44ee99b61b07ea7dd2
Author: John Roesler <[email protected]>
AuthorDate: Wed Aug 1 13:01:12 2018 -0500
MINOR: Remove 1 minute minimum segment interval (#5323)
* new minimum is 0, just like window size
* refactor tests to use smaller segment sizes as well
Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang
<[email protected]>
---
.../org/apache/kafka/streams/state/Stores.java | 14 +-
.../org/apache/kafka/streams/state/StoresTest.java | 31 ++--
.../state/internals/CachingSessionStoreTest.java | 132 ++++++++--------
.../state/internals/CachingWindowStoreTest.java | 97 +++++++-----
.../internals/RocksDBSegmentedBytesStoreTest.java | 169 +++++++++++++--------
.../state/internals/RocksDBWindowStoreTest.java | 34 ++---
.../streams/state/internals/SegmentsTest.java | 81 +++++-----
7 files changed, 318 insertions(+), 240 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index c1b81c6..03eaa07 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -29,8 +29,6 @@ import
org.apache.kafka.streams.state.internals.RocksDbSessionBytesStoreSupplier
import
org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Objects;
@@ -74,8 +72,6 @@ import java.util.Objects;
@InterfaceStability.Evolving
public class Stores {
- private static final Logger log = LoggerFactory.getLogger(Stores.class);
-
/**
* Create a persistent {@link KeyValueBytesStoreSupplier}.
* @param name name of the store (cannot be {@code null})
@@ -195,7 +191,7 @@ public class Stores {
* Create a persistent {@link WindowBytesStoreSupplier}.
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store
(cannot be negative)
- * @param segmentInterval size of segments in ms (must be at least
one minute)
+ * @param segmentInterval size of segments in ms (cannot be negative)
* @param windowSize size of the windows (cannot be negative)
* @param retainDuplicates whether or not to retain duplicates.
* @return an instance of {@link WindowBytesStoreSupplier}
@@ -206,14 +202,14 @@ public class Stores {
final boolean
retainDuplicates,
final long
segmentInterval) {
Objects.requireNonNull(name, "name cannot be null");
- if (retentionPeriod < 0) {
+ if (retentionPeriod < 0L) {
throw new IllegalArgumentException("retentionPeriod cannot be
negative");
}
- if (windowSize < 0) {
+ if (windowSize < 0L) {
throw new IllegalArgumentException("windowSize cannot be
negative");
}
- if (segmentInterval < 60_000) {
- throw new IllegalArgumentException("segmentInterval must be at
least one minute");
+ if (segmentInterval < 1L) {
+ throw new IllegalArgumentException("segmentInterval cannot be zero
or negative");
}
return new RocksDbWindowBytesStoreSupplier(name, retentionPeriod,
segmentInterval, windowSize, retainDuplicates);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
index 23f246d..d0da158 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
@@ -38,6 +38,7 @@ public class StoresTest {
@Test(expected = NullPointerException.class)
public void shouldThrowIfIMemoryKeyValueStoreStoreNameIsNull() {
+ //noinspection ResultOfMethodCallIgnored
Stores.inMemoryKeyValueStore(null);
}
@@ -53,12 +54,12 @@ public class StoresTest {
@Test(expected = NullPointerException.class)
public void shouldThrowIfIPersistentWindowStoreStoreNameIsNull() {
- Stores.persistentWindowStore(null, 0L, 0L, false, 60_000L);
+ Stores.persistentWindowStore(null, 0L, 0L, false, 0L);
}
@Test(expected = IllegalArgumentException.class)
public void shouldThrowIfIPersistentWindowStoreRetentionPeriodIsNegative()
{
- Stores.persistentWindowStore("anyName", -1L, 0L, false, 60_000L);
+ Stores.persistentWindowStore("anyName", -1L, 0L, false, 0L);
}
@Deprecated
@@ -74,7 +75,7 @@ public class StoresTest {
@Test(expected = IllegalArgumentException.class)
public void
shouldThrowIfIPersistentWindowStoreIfSegmentIntervalIsTooSmall() {
- Stores.persistentWindowStore("anyName", 1L, 1L, false, 59_999L);
+ Stores.persistentWindowStore("anyName", 1L, 1L, false, -1L);
}
@Test(expected = NullPointerException.class)
@@ -129,25 +130,31 @@ public class StoresTest {
@Test
public void shouldBuildWindowStore() {
- final WindowStore<String, String> store =
Stores.windowStoreBuilder(Stores.persistentWindowStore("store", 3L, 3L, true),
-
Serdes.String(),
-
Serdes.String()).build();
+ final WindowStore<String, String> store = Stores.windowStoreBuilder(
+ Stores.persistentWindowStore("store", 3L, 3L, true),
+ Serdes.String(),
+ Serdes.String()
+ ).build();
assertThat(store, not(nullValue()));
}
@Test
public void shouldBuildKeyValueStore() {
- final KeyValueStore<String, String> store =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("name"),
-
Serdes.String(),
-
Serdes.String()).build();
+ final KeyValueStore<String, String> store =
Stores.keyValueStoreBuilder(
+ Stores.persistentKeyValueStore("name"),
+ Serdes.String(),
+ Serdes.String()
+ ).build();
assertThat(store, not(nullValue()));
}
@Test
public void shouldBuildSessionStore() {
- final SessionStore<String, String> store =
Stores.sessionStoreBuilder(Stores.persistentSessionStore("name", 10),
-
Serdes.String(),
-
Serdes.String()).build();
+ final SessionStore<String, String> store = Stores.sessionStoreBuilder(
+ Stores.persistentSessionStore("name", 10),
+ Serdes.String(),
+ Serdes.String()
+ ).build();
assertThat(store, not(nullValue()));
}
}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index 194edb1..47e79c9 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
@@ -38,9 +37,12 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Random;
+import java.util.Set;
+import static org.apache.kafka.common.utils.Utils.mkSet;
import static
org.apache.kafka.streams.state.internals.RocksDBSessionStoreTest.toList;
import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList;
import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue;
@@ -49,14 +51,15 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+@SuppressWarnings("PointlessArithmeticExpression")
public class CachingSessionStoreTest {
private static final int MAX_CACHE_SIZE_BYTES = 600;
- private InternalMockProcessorContext context;
+ private static final Long DEFAULT_TIMESTAMP = 10L;
+ private static final long SEGMENT_INTERVAL = 100L;
private RocksDBSegmentedBytesStore underlying;
private CachingSessionStore<String, String> cachingStore;
private ThreadCache cache;
- private static final Long DEFAULT_TIMESTAMP = 10L;
private final Bytes keyA = Bytes.wrap("a".getBytes());
private final Bytes keyAA = Bytes.wrap("aa".getBytes());
private final Bytes keyB = Bytes.wrap("b".getBytes());
@@ -65,17 +68,11 @@ public class CachingSessionStoreTest {
public void setUp() {
final SessionKeySchema schema = new SessionKeySchema();
schema.init("topic");
- final int retention = 60000;
- final int segmentInterval = 60_000;
- underlying = new RocksDBSegmentedBytesStore("test", retention,
segmentInterval, schema);
+ underlying = new RocksDBSegmentedBytesStore("test", 0L,
SEGMENT_INTERVAL, schema);
final RocksDBSessionStore<Bytes, byte[]> sessionStore = new
RocksDBSessionStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray());
- cachingStore = new CachingSessionStore<>(sessionStore,
- Serdes.String(),
- Serdes.String(),
- segmentInterval
- );
+ cachingStore = new CachingSessionStore<>(sessionStore,
Serdes.String(), Serdes.String(), SEGMENT_INTERVAL);
cache = new ThreadCache(new LogContext("testCache "),
MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
- context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
null, null, null, cache);
+ final InternalMockProcessorContext context = new
InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null,
cache);
context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP,
0, 0, "topic", null));
cachingStore.init(context, cachingStore);
}
@@ -134,11 +131,13 @@ public class CachingSessionStoreTest {
@Test
public void shouldFetchAllSessionsWithSameRecordKey() {
- final List<KeyValue<Windowed<Bytes>, byte[]>> expected =
Arrays.asList(KeyValue.pair(new Windowed<>(keyA, new SessionWindow(0, 0)),
"1".getBytes()),
-
KeyValue.pair(new Windowed<>(keyA, new SessionWindow(10, 10)), "2".getBytes()),
-
KeyValue.pair(new Windowed<>(keyA, new SessionWindow(100, 100)),
"3".getBytes()),
-
KeyValue.pair(new Windowed<>(keyA, new SessionWindow(1000, 1000)),
"4".getBytes()));
- for (KeyValue<Windowed<Bytes>, byte[]> kv : expected) {
+ final List<KeyValue<Windowed<Bytes>, byte[]>> expected = Arrays.asList(
+ KeyValue.pair(new Windowed<>(keyA, new SessionWindow(0, 0)),
"1".getBytes()),
+ KeyValue.pair(new Windowed<>(keyA, new SessionWindow(10, 10)),
"2".getBytes()),
+ KeyValue.pair(new Windowed<>(keyA, new SessionWindow(100, 100)),
"3".getBytes()),
+ KeyValue.pair(new Windowed<>(keyA, new SessionWindow(1000, 1000)),
"4".getBytes())
+ );
+ for (final KeyValue<Windowed<Bytes>, byte[]> kv : expected) {
cachingStore.put(kv.key, kv.value);
}
@@ -184,14 +183,14 @@ public class CachingSessionStoreTest {
@Test
public void shouldFetchCorrectlyAcrossSegments() {
- final Windowed<Bytes> a1 = new Windowed<>(keyA, new SessionWindow(0,
0));
- final Windowed<Bytes> a2 = new Windowed<>(keyA, new
SessionWindow(60_000, 60_000));
- final Windowed<Bytes> a3 = new Windowed<>(keyA, new
SessionWindow(120_000, 120_000));
+ final Windowed<Bytes> a1 = new Windowed<>(keyA, new
SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0));
+ final Windowed<Bytes> a2 = new Windowed<>(keyA, new
SessionWindow(SEGMENT_INTERVAL * 1, SEGMENT_INTERVAL * 1));
+ final Windowed<Bytes> a3 = new Windowed<>(keyA, new
SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2));
cachingStore.put(a1, "1".getBytes());
cachingStore.put(a2, "2".getBytes());
cachingStore.put(a3, "3".getBytes());
cachingStore.flush();
- final KeyValueIterator<Windowed<Bytes>, byte[]> results =
cachingStore.findSessions(keyA, 0, 60_000 * 2);
+ final KeyValueIterator<Windowed<Bytes>, byte[]> results =
cachingStore.findSessions(keyA, 0, SEGMENT_INTERVAL * 2);
assertEquals(a1, results.next().key);
assertEquals(a2, results.next().key);
assertEquals(a3, results.next().key);
@@ -200,11 +199,11 @@ public class CachingSessionStoreTest {
@Test
public void shouldFetchRangeCorrectlyAcrossSegments() {
- final Windowed<Bytes> a1 = new Windowed<>(keyA, new SessionWindow(0,
0));
- final Windowed<Bytes> aa1 = new Windowed<>(keyAA, new SessionWindow(0,
0));
- final Windowed<Bytes> a2 = new Windowed<>(keyA, new
SessionWindow(60_000, 60_000));
- final Windowed<Bytes> a3 = new Windowed<>(keyA, new
SessionWindow(60_000 * 2, 60_000 * 2));
- final Windowed<Bytes> aa3 = new Windowed<>(keyAA, new
SessionWindow(60_000 * 2, 60_000 * 2));
+ final Windowed<Bytes> a1 = new Windowed<>(keyA, new
SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0));
+ final Windowed<Bytes> aa1 = new Windowed<>(keyAA, new
SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0));
+ final Windowed<Bytes> a2 = new Windowed<>(keyA, new
SessionWindow(SEGMENT_INTERVAL * 1, SEGMENT_INTERVAL * 1));
+ final Windowed<Bytes> a3 = new Windowed<>(keyA, new
SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2));
+ final Windowed<Bytes> aa3 = new Windowed<>(keyAA, new
SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2));
cachingStore.put(a1, "1".getBytes());
cachingStore.put(aa1, "1".getBytes());
cachingStore.put(a2, "2".getBytes());
@@ -212,13 +211,13 @@ public class CachingSessionStoreTest {
cachingStore.put(aa3, "3".getBytes());
cachingStore.flush();
- final KeyValueIterator<Windowed<Bytes>, byte[]> rangeResults =
cachingStore.findSessions(keyA, keyAA, 0, 60_000 * 2);
- assertEquals(a1, rangeResults.next().key);
- assertEquals(aa1, rangeResults.next().key);
- assertEquals(a2, rangeResults.next().key);
- assertEquals(a3, rangeResults.next().key);
- assertEquals(aa3, rangeResults.next().key);
- assertFalse(rangeResults.hasNext());
+ final KeyValueIterator<Windowed<Bytes>, byte[]> rangeResults =
cachingStore.findSessions(keyA, keyAA, 0, SEGMENT_INTERVAL * 2);
+ final Set<Windowed<Bytes>> keys = new HashSet<>();
+ while (rangeResults.hasNext()) {
+ keys.add(rangeResults.next().key);
+ }
+ rangeResults.close();
+ assertEquals(mkSet(a1, a2, a3, aa1, aa3), keys);
}
@Test
@@ -226,25 +225,28 @@ public class CachingSessionStoreTest {
final Windowed<Bytes> a = new Windowed<>(keyA, new SessionWindow(0,
0));
final Windowed<String> aDeserialized = new Windowed<>("a", new
SessionWindow(0, 0));
final List<KeyValue<Windowed<String>, Change<String>>> flushed = new
ArrayList<>();
- cachingStore.setFlushListener(new CacheFlushListener<Windowed<String>,
String>() {
- @Override
- public void apply(final Windowed<String> key, final String
newValue, final String oldValue) {
- flushed.add(KeyValue.pair(key, new Change<>(newValue,
oldValue)));
- }
- }, true);
-
+ cachingStore.setFlushListener(
+ (key, newValue, oldValue) -> flushed.add(KeyValue.pair(key, new
Change<>(newValue, oldValue))),
+ true
+ );
+
cachingStore.put(a, "1".getBytes());
cachingStore.flush();
-
+
cachingStore.put(a, "2".getBytes());
cachingStore.flush();
cachingStore.remove(a);
cachingStore.flush();
- assertEquals(flushed, Arrays.asList(KeyValue.pair(aDeserialized, new
Change<>("1", null)),
- KeyValue.pair(aDeserialized, new
Change<>("2", "1")),
- KeyValue.pair(aDeserialized, new
Change<>(null, "2"))));
+ assertEquals(
+ flushed,
+ Arrays.asList(
+ KeyValue.pair(aDeserialized, new Change<>("1", null)),
+ KeyValue.pair(aDeserialized, new Change<>("2", "1")),
+ KeyValue.pair(aDeserialized, new Change<>(null, "2"))
+ )
+ );
}
@Test
@@ -252,12 +254,10 @@ public class CachingSessionStoreTest {
final Windowed<Bytes> a = new Windowed<>(keyA, new SessionWindow(0,
0));
final Windowed<String> aDeserialized = new Windowed<>("a", new
SessionWindow(0, 0));
final List<KeyValue<Windowed<String>, Change<String>>> flushed = new
ArrayList<>();
- cachingStore.setFlushListener(new CacheFlushListener<Windowed<String>,
String>() {
- @Override
- public void apply(final Windowed<String> key, final String
newValue, final String oldValue) {
- flushed.add(KeyValue.pair(key, new Change<>(newValue,
oldValue)));
- }
- }, false);
+ cachingStore.setFlushListener(
+ (key, newValue, oldValue) -> flushed.add(KeyValue.pair(key, new
Change<>(newValue, oldValue))),
+ false
+ );
cachingStore.put(a, "1".getBytes());
cachingStore.flush();
@@ -268,9 +268,14 @@ public class CachingSessionStoreTest {
cachingStore.remove(a);
cachingStore.flush();
- assertEquals(flushed, Arrays.asList(KeyValue.pair(aDeserialized, new
Change<>("1", null)),
- KeyValue.pair(aDeserialized, new
Change<>("2", null)),
- KeyValue.pair(aDeserialized, new
Change<>(null, "2"))));
+ assertEquals(
+ flushed,
+ Arrays.asList(
+ KeyValue.pair(aDeserialized, new Change<>("1", null)),
+ KeyValue.pair(aDeserialized, new Change<>("2", null)),
+ KeyValue.pair(aDeserialized, new Change<>(null, "2"))
+ )
+ );
}
@Test
@@ -278,12 +283,10 @@ public class CachingSessionStoreTest {
final Windowed<Bytes> a = new Windowed<>(keyA, new SessionWindow(0,
0));
final Windowed<String> aDeserialized = new Windowed<>("a", new
SessionWindow(0, 0));
final List<KeyValue<Windowed<String>, Change<String>>> flushed = new
ArrayList<>();
- cachingStore.setFlushListener(new CacheFlushListener<Windowed<String>,
String>() {
- @Override
- public void apply(final Windowed<String> key, final String
newValue, final String oldValue) {
- flushed.add(KeyValue.pair(key, new Change<>(newValue,
oldValue)));
- }
- }, false);
+ cachingStore.setFlushListener(
+ (key, newValue, oldValue) -> flushed.add(KeyValue.pair(key, new
Change<>(newValue, oldValue))),
+ false
+ );
cachingStore.put(a, "1".getBytes());
cachingStore.flush();
@@ -292,8 +295,13 @@ public class CachingSessionStoreTest {
cachingStore.flush();
- assertEquals(flushed, Arrays.asList(KeyValue.pair(aDeserialized, new
Change<>("1", null)),
- KeyValue.pair(aDeserialized, new
Change<>("2", null))));
+ assertEquals(
+ flushed,
+ Arrays.asList(
+ KeyValue.pair(aDeserialized, new Change<>("1", null)),
+ KeyValue.pair(aDeserialized, new Change<>("2", null))
+ )
+ );
}
@Test
@@ -369,7 +377,7 @@ public class CachingSessionStoreTest {
cachingStore.put(null, "1".getBytes());
}
- private List<KeyValue<Windowed<Bytes>, byte[]>>
addSessionsUntilOverflow(final String...sessionIds) {
+ private List<KeyValue<Windowed<Bytes>, byte[]>>
addSessionsUntilOverflow(final String... sessionIds) {
final Random random = new Random();
final List<KeyValue<Windowed<Bytes>, byte[]>> results = new
ArrayList<>();
while (cache.size() == results.size()) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index 118acec..3bcb1a2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -26,7 +26,6 @@ import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.test.InternalMockProcessorContext;
@@ -35,7 +34,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.io.IOException;
import java.util.List;
import static org.apache.kafka.common.utils.Utils.mkList;
@@ -55,7 +53,8 @@ public class CachingWindowStoreTest {
private static final int MAX_CACHE_SIZE_BYTES = 150;
private static final long DEFAULT_TIMESTAMP = 10L;
- private static final Long WINDOW_SIZE = 10000L;
+ private static final Long WINDOW_SIZE = 10L;
+ private static final long SEGMENT_INTERVAL = 100L;
private InternalMockProcessorContext context;
private RocksDBSegmentedBytesStore underlying;
private CachingWindowStore<String, String> cachingStore;
@@ -67,20 +66,14 @@ public class CachingWindowStoreTest {
@Before
public void setUp() {
keySchema = new WindowKeySchema();
- final int retention = 60_000;
- final int segmentInterval = 60_000;
- underlying = new RocksDBSegmentedBytesStore("test", retention,
segmentInterval, keySchema);
+ underlying = new RocksDBSegmentedBytesStore("test", 0,
SEGMENT_INTERVAL, keySchema);
final RocksDBWindowStore<Bytes, byte[]> windowStore = new
RocksDBWindowStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray(), false,
WINDOW_SIZE);
cacheListener = new
CachingKeyValueStoreTest.CacheFlushListenerStub<>();
- cachingStore = new CachingWindowStore<>(windowStore,
- Serdes.String(),
- Serdes.String(),
- WINDOW_SIZE,
- segmentInterval);
+ cachingStore = new CachingWindowStore<>(windowStore, Serdes.String(),
Serdes.String(), WINDOW_SIZE, SEGMENT_INTERVAL);
cachingStore.setFlushListener(cacheListener, false);
cache = new ThreadCache(new LogContext("testCache "),
MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
topic = "topic";
- context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
null, null, (RecordCollector) null, cache);
+ context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
null, null, null, cache);
context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP,
0, 0, topic, null));
cachingStore.init(context, cachingStore);
}
@@ -133,7 +126,7 @@ public class CachingWindowStoreTest {
assertFalse(iterator.hasNext());
assertEquals(2, cache.size());
}
-
+
@Test
public void shouldGetAllFromCache() {
cachingStore.put(bytesKey("a"), bytesValue("a"));
@@ -146,46 +139,46 @@ public class CachingWindowStoreTest {
cachingStore.put(bytesKey("h"), bytesValue("h"));
final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
cachingStore.all();
- String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"};
- for (String s : array) {
+ final String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"};
+ for (final String s : array) {
verifyWindowedKeyValue(iterator.next(), new
Windowed<>(bytesKey(s), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP +
WINDOW_SIZE)), s);
}
assertFalse(iterator.hasNext());
}
-
+
@Test
public void shouldFetchAllWithinTimestampRange() {
- String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"};
+ final String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"};
for (int i = 0; i < array.length; i++) {
context.setTime(i);
cachingStore.put(bytesKey(array[i]), bytesValue(array[i]));
}
-
+
final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
cachingStore.fetchAll(0, 7);
for (int i = 0; i < array.length; i++) {
- String str = array[i];
+ final String str = array[i];
verifyWindowedKeyValue(iterator.next(), new
Windowed<>(bytesKey(str), new TimeWindow(i, i + WINDOW_SIZE)), str);
}
assertFalse(iterator.hasNext());
-
+
final KeyValueIterator<Windowed<Bytes>, byte[]> iterator1 =
cachingStore.fetchAll(2, 4);
for (int i = 2; i <= 4; i++) {
- String str = array[i];
+ final String str = array[i];
verifyWindowedKeyValue(iterator1.next(), new
Windowed<>(bytesKey(str), new TimeWindow(i, i + WINDOW_SIZE)), str);
}
assertFalse(iterator1.hasNext());
-
+
final KeyValueIterator<Windowed<Bytes>, byte[]> iterator2 =
cachingStore.fetchAll(5, 7);
for (int i = 5; i <= 7; i++) {
- String str = array[i];
+ final String str = array[i];
verifyWindowedKeyValue(iterator2.next(), new
Windowed<>(bytesKey(str), new TimeWindow(i, i + WINDOW_SIZE)), str);
}
assertFalse(iterator2.hasNext());
}
@Test
- public void shouldFlushEvictedItemsIntoUnderlyingStore() throws
IOException {
- int added = addItemsToCache();
+ public void shouldFlushEvictedItemsIntoUnderlyingStore() {
+ final int added = addItemsToCache();
// all dirty entries should have been flushed
final KeyValueIterator<Bytes, byte[]> iter =
underlying.fetch(Bytes.wrap("0".getBytes()), DEFAULT_TIMESTAMP,
DEFAULT_TIMESTAMP);
final KeyValue<Bytes, byte[]> next = iter.next();
@@ -228,8 +221,8 @@ public class CachingWindowStoreTest {
}
@Test
- public void shouldForwardDirtyItemToListenerWhenEvicted() throws
IOException {
- int numRecords = addItemsToCache();
+ public void shouldForwardDirtyItemToListenerWhenEvicted() {
+ final int numRecords = addItemsToCache();
assertEquals(numRecords, cacheListener.forwarded.size());
}
@@ -257,7 +250,7 @@ public class CachingWindowStoreTest {
@Test
public void shouldIterateCacheAndStore() {
- final Bytes key = Bytes.wrap("1" .getBytes());
+ final Bytes key = Bytes.wrap("1".getBytes());
underlying.put(WindowKeySchema.toStoreKeyBinary(key,
DEFAULT_TIMESTAMP, 0), "a".getBytes());
cachingStore.put(key, bytesValue("b"), DEFAULT_TIMESTAMP +
WINDOW_SIZE);
final WindowStoreIterator<byte[]> fetch =
cachingStore.fetch(bytesKey("1"), DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP +
WINDOW_SIZE);
@@ -268,7 +261,7 @@ public class CachingWindowStoreTest {
@Test
public void shouldIterateCacheAndStoreKeyRange() {
- final Bytes key = Bytes.wrap("1" .getBytes());
+ final Bytes key = Bytes.wrap("1".getBytes());
underlying.put(WindowKeySchema.toStoreKeyBinary(key,
DEFAULT_TIMESTAMP, 0), "a".getBytes());
cachingStore.put(key, bytesValue("b"), DEFAULT_TIMESTAMP +
WINDOW_SIZE);
@@ -311,9 +304,13 @@ public class CachingWindowStoreTest {
cachingStore.put(bytesKey("aa"), bytesValue("0002"), 0);
cachingStore.put(bytesKey("a"), bytesValue("0003"), 1);
cachingStore.put(bytesKey("aa"), bytesValue("0004"), 1);
- cachingStore.put(bytesKey("a"), bytesValue("0005"), 60000);
+ cachingStore.put(bytesKey("a"), bytesValue("0005"), SEGMENT_INTERVAL);
- final List<KeyValue<Long, byte[]>> expected = mkList(KeyValue.pair(0L,
bytesValue("0001")), KeyValue.pair(1L, bytesValue("0003")),
KeyValue.pair(60000L, bytesValue("0005")));
+ final List<KeyValue<Long, byte[]>> expected = mkList(
+ KeyValue.pair(0L, bytesValue("0001")),
+ KeyValue.pair(1L, bytesValue("0003")),
+ KeyValue.pair(SEGMENT_INTERVAL, bytesValue("0005"))
+ );
final List<KeyValue<Long, byte[]>> actual =
toList(cachingStore.fetch(bytesKey("a"), 0, Long.MAX_VALUE));
verifyKeyValueList(expected, actual);
}
@@ -324,16 +321,32 @@ public class CachingWindowStoreTest {
cachingStore.put(bytesKey("aa"), bytesValue("0002"), 0);
cachingStore.put(bytesKey("a"), bytesValue("0003"), 1);
cachingStore.put(bytesKey("aa"), bytesValue("0004"), 1);
- cachingStore.put(bytesKey("a"), bytesValue("0005"), 60000);
-
- verifyKeyValueList(mkList(windowedPair("a", "0001", 0),
windowedPair("a", "0003", 1), windowedPair("a", "0005", 60000L)),
- toList(cachingStore.fetch(bytesKey("a"),
bytesKey("a"), 0, Long.MAX_VALUE)));
-
- verifyKeyValueList(mkList(windowedPair("aa", "0002", 0),
windowedPair("aa", "0004", 1)),
- toList(cachingStore.fetch(bytesKey("aa"),
bytesKey("aa"), 0, Long.MAX_VALUE)));
-
- verifyKeyValueList(mkList(windowedPair("a", "0001", 0),
windowedPair("a", "0003", 1), windowedPair("aa", "0002", 0), windowedPair("aa",
"0004", 1), windowedPair("a", "0005", 60000L)),
- toList(cachingStore.fetch(bytesKey("a"),
bytesKey("aa"), 0, Long.MAX_VALUE)));
+ cachingStore.put(bytesKey("a"), bytesValue("0005"), SEGMENT_INTERVAL);
+
+ verifyKeyValueList(
+ mkList(
+ windowedPair("a", "0001", 0),
+ windowedPair("a", "0003", 1),
+ windowedPair("a", "0005", SEGMENT_INTERVAL)
+ ),
+ toList(cachingStore.fetch(bytesKey("a"), bytesKey("a"), 0,
Long.MAX_VALUE))
+ );
+
+ verifyKeyValueList(
+ mkList(windowedPair("aa", "0002", 0), windowedPair("aa", "0004",
1)),
+ toList(cachingStore.fetch(bytesKey("aa"), bytesKey("aa"), 0,
Long.MAX_VALUE))
+ );
+
+ verifyKeyValueList(
+ mkList(
+ windowedPair("a", "0001", 0),
+ windowedPair("a", "0003", 1),
+ windowedPair("aa", "0002", 0),
+ windowedPair("aa", "0004", 1),
+ windowedPair("a", "0005", SEGMENT_INTERVAL)
+ ),
+ toList(cachingStore.fetch(bytesKey("a"), bytesKey("aa"), 0,
Long.MAX_VALUE))
+ );
}
@Test(expected = NullPointerException.class)
@@ -361,7 +374,7 @@ public class CachingWindowStoreTest {
cachingStore.fetch(bytesKey("anyFrom"), null, 1L, 2L);
}
- private static KeyValue<Windowed<Bytes>, byte[]> windowedPair(String key,
String value, long timestamp) {
+ private static KeyValue<Windowed<Bytes>, byte[]> windowedPair(final String
key, final String value, final long timestamp) {
return KeyValue.pair(new Windowed<>(bytesKey(key), new
TimeWindow(timestamp, timestamp + WINDOW_SIZE)), bytesValue(value));
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index 8e69ccb..cffd73f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -38,13 +38,10 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
import org.rocksdb.WriteBatch;
-import static
org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize;
-
-
import java.io.File;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -52,12 +49,13 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.SimpleTimeZone;
+import static
org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
@@ -68,14 +66,13 @@ import static org.junit.Assert.assertTrue;
@RunWith(Parameterized.class)
public class RocksDBSegmentedBytesStoreTest {
+ private final long windowSizeForTimeWindow = 500;
private final long retention = 1000;
- private final long segmentInterval = 60_000;
- private final int numSegments = 3;
+ private final long segmentInterval = 60_000L;
private InternalMockProcessorContext context;
private final String storeName = "bytes-store";
private RocksDBSegmentedBytesStore bytesStore;
private File stateDir;
- private long windowSizeForTimeWindow = 500;
private final Window[] windows = new Window[4];
@Parameter
@@ -83,7 +80,7 @@ public class RocksDBSegmentedBytesStoreTest {
@Parameters(name = "{0}")
public static Object[] getKeySchemas() {
- return new Object[]{new SessionKeySchema(), new WindowKeySchema()};
+ return new Object[] {new SessionKeySchema(), new WindowKeySchema()};
}
@Before
@@ -94,29 +91,32 @@ public class RocksDBSegmentedBytesStoreTest {
windows[0] = new SessionWindow(10, 10);
windows[1] = new SessionWindow(500, 1000);
windows[2] = new SessionWindow(1000, 1500);
- windows[3] = new SessionWindow(30000, 60000);
+ windows[3] = new SessionWindow(30_000L, 60_000L);
}
if (schema instanceof WindowKeySchema) {
windows[0] = timeWindowForSize(10, windowSizeForTimeWindow);
windows[1] = timeWindowForSize(500, windowSizeForTimeWindow);
windows[2] = timeWindowForSize(1000, windowSizeForTimeWindow);
- windows[3] = timeWindowForSize(60000, windowSizeForTimeWindow);
+ windows[3] = timeWindowForSize(60_000L, windowSizeForTimeWindow);
}
- bytesStore = new RocksDBSegmentedBytesStore(storeName,
- retention,
- segmentInterval,
- schema);
+ bytesStore = new RocksDBSegmentedBytesStore(
+ storeName,
+ retention,
+ segmentInterval,
+ schema
+ );
stateDir = TestUtils.tempDirectory();
context = new InternalMockProcessorContext(
- stateDir,
- Serdes.String(),
- Serdes.Long(),
- new NoOpRecordCollector(),
- new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())));
+ stateDir,
+ Serdes.String(),
+ Serdes.Long(),
+ new NoOpRecordCollector(),
+ new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics()))
+ );
bytesStore.init(context, bytesStore);
}
@@ -134,8 +134,10 @@ public class RocksDBSegmentedBytesStoreTest {
final KeyValueIterator<Bytes, byte[]> values =
bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 500);
- final List<KeyValue<Windowed<String>, Long>> expected =
Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 10L),
- KeyValue.pair(new Windowed<>(key, windows[1]), 50L));
+ final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
+ KeyValue.pair(new Windowed<>(key, windows[0]), 10L),
+ KeyValue.pair(new Windowed<>(key, windows[1]), 50L)
+ );
assertEquals(expected, toList(values));
}
@@ -147,8 +149,10 @@ public class RocksDBSegmentedBytesStoreTest {
bytesStore.put(serializeKey(new Windowed<>(key, windows[1])),
serializeValue(50));
bytesStore.put(serializeKey(new Windowed<>(key, windows[2])),
serializeValue(100));
final KeyValueIterator<Bytes, byte[]> results =
bytesStore.fetch(Bytes.wrap(key.getBytes()), 1, 999);
- final List<KeyValue<Windowed<String>, Long>> expected =
Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 10L),
- KeyValue.pair(new Windowed<>(key, windows[1]), 50L));
+ final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
+ KeyValue.pair(new Windowed<>(key, windows[0]), 10L),
+ KeyValue.pair(new Windowed<>(key, windows[1]), 50L)
+ );
assertEquals(expected, toList(results));
}
@@ -181,9 +185,14 @@ public class RocksDBSegmentedBytesStoreTest {
final List<KeyValue<Windowed<String>, Long>> results =
toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 1500));
- assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key,
windows[0]), 50L),
+ assertEquals(
+ Arrays.asList(
+ KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
KeyValue.pair(new Windowed<>(key, windows[1]), 100L),
- KeyValue.pair(new Windowed<>(key, windows[2]), 500L)),
results);
+ KeyValue.pair(new Windowed<>(key, windows[2]), 500L)
+ ),
+ results
+ );
}
@@ -198,13 +207,22 @@ public class RocksDBSegmentedBytesStoreTest {
assertEquals(Collections.singleton(segments.segmentName(0)),
segmentDirs());
bytesStore.put(serializeKey(new Windowed<>(key, windows[3])),
serializeValue(100L));
- assertEquals(Utils.mkSet(segments.segmentName(0),
- segments.segmentName(1)), segmentDirs());
+ assertEquals(
+ Utils.mkSet(
+ segments.segmentName(0),
+ segments.segmentName(1)
+ ),
+ segmentDirs()
+ );
final List<KeyValue<Windowed<String>, Long>> results =
toList(bytesStore.all());
- assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key,
windows[0]), 50L),
+ assertEquals(
+ Arrays.asList(
+ KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
KeyValue.pair(new Windowed<>(key, windows[3]), 100L)
- ), results);
+ ),
+ results
+ );
}
@@ -218,18 +236,27 @@ public class RocksDBSegmentedBytesStoreTest {
assertEquals(Collections.singleton(segments.segmentName(0)),
segmentDirs());
bytesStore.put(serializeKey(new Windowed<>(key, windows[3])),
serializeValue(100L));
- assertEquals(Utils.mkSet(segments.segmentName(0),
- segments.segmentName(1)), segmentDirs());
-
- final List<KeyValue<Windowed<String>, Long>> results =
toList(bytesStore.fetchAll(0L, 60000L));
- assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key,
windows[0]), 50L),
+ assertEquals(
+ Utils.mkSet(
+ segments.segmentName(0),
+ segments.segmentName(1)
+ ),
+ segmentDirs()
+ );
+
+ final List<KeyValue<Windowed<String>, Long>> results =
toList(bytesStore.fetchAll(0L, 60_000L));
+ assertEquals(
+ Arrays.asList(
+ KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
KeyValue.pair(new Windowed<>(key, windows[3]), 100L)
- ), results);
+ ),
+ results
+ );
}
@Test
- public void shouldLoadSegementsWithOldStyleDateFormattedName() {
+ public void shouldLoadSegmentsWithOldStyleDateFormattedName() {
final Segments segments = new Segments(storeName, retention,
segmentInterval);
final String key = "a";
@@ -247,20 +274,29 @@ public class RocksDBSegmentedBytesStoreTest {
final File oldStyleName = new File(parent, nameParts[0] + "-" +
formatted);
assertTrue(new File(parent, firstSegmentName).renameTo(oldStyleName));
- bytesStore = new RocksDBSegmentedBytesStore(storeName,
- retention,
- segmentInterval,
- schema);
+ bytesStore = new RocksDBSegmentedBytesStore(
+ storeName,
+ retention,
+ segmentInterval,
+ schema
+ );
bytesStore.init(context, bytesStore);
- final List<KeyValue<Windowed<String>, Long>> results =
toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60000L));
- assertThat(results, equalTo(Arrays.asList(KeyValue.pair(new
Windowed<>(key, windows[0]), 50L),
- KeyValue.pair(new Windowed<>(key, windows[3]), 100L))));
+ final List<KeyValue<Windowed<String>, Long>> results =
toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60_000L));
+ assertThat(
+ results,
+ equalTo(
+ Arrays.asList(
+ KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
+ KeyValue.pair(new Windowed<>(key, windows[3]), 100L)
+ )
+ )
+ );
}
@Test
- public void shouldLoadSegementsWithOldStyleColonFormattedName() {
+ public void shouldLoadSegmentsWithOldStyleColonFormattedName() {
final Segments segments = new Segments(storeName, retention,
segmentInterval);
final String key = "a";
@@ -274,15 +310,24 @@ public class RocksDBSegmentedBytesStoreTest {
final File oldStyleName = new File(parent, nameParts[0] + ":" +
Long.parseLong(nameParts[1]));
assertTrue(new File(parent, firstSegmentName).renameTo(oldStyleName));
- bytesStore = new RocksDBSegmentedBytesStore(storeName,
- retention,
- segmentInterval,
- schema);
+ bytesStore = new RocksDBSegmentedBytesStore(
+ storeName,
+ retention,
+ segmentInterval,
+ schema
+ );
bytesStore.init(context, bytesStore);
- final List<KeyValue<Windowed<String>, Long>> results =
toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60000L));
- assertThat(results, equalTo(Arrays.asList(KeyValue.pair(new
Windowed<>(key, windows[0]), 50L),
- KeyValue.pair(new Windowed<>(key, windows[3]), 100L))));
+ final List<KeyValue<Windowed<String>, Long>> results =
toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60_000L));
+ assertThat(
+ results,
+ equalTo(
+ Arrays.asList(
+ KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
+ KeyValue.pair(new Windowed<>(key, windows[3]), 100L)
+ )
+ )
+ );
}
@@ -304,7 +349,7 @@ public class RocksDBSegmentedBytesStoreTest {
records.add(new KeyValue<>(serializeKey(new Windowed<>(key,
windows[3])).get(), serializeValue(100L)));
final Map<Segment, WriteBatch> writeBatchMap =
bytesStore.getWriteBatches(records);
assertEquals(2, writeBatchMap.size());
- for (final WriteBatch batch: writeBatchMap.values()) {
+ for (final WriteBatch batch : writeBatchMap.values()) {
assertEquals(1, batch.count());
}
}
@@ -323,7 +368,7 @@ public class RocksDBSegmentedBytesStoreTest {
assertEquals(2, bytesStore.getSegments().size());
// Bulk loading is enabled during recovery.
- for (final Segment segment: bytesStore.getSegments()) {
+ for (final Segment segment : bytesStore.getSegments()) {
Assert.assertThat(segment.getOptions().level0FileNumCompactionTrigger(),
equalTo(1 << 30));
}
@@ -347,20 +392,20 @@ public class RocksDBSegmentedBytesStoreTest {
restoreListener.onRestoreStart(null, bytesStore.name(), 0L, 0L);
- for (final Segment segment: bytesStore.getSegments()) {
+ for (final Segment segment : bytesStore.getSegments()) {
Assert.assertThat(segment.getOptions().level0FileNumCompactionTrigger(),
equalTo(1 << 30));
}
restoreListener.onRestoreEnd(null, bytesStore.name(), 0L);
- for (final Segment segment: bytesStore.getSegments()) {
+ for (final Segment segment : bytesStore.getSegments()) {
Assert.assertThat(segment.getOptions().level0FileNumCompactionTrigger(),
equalTo(4));
}
}
private Set<String> segmentDirs() {
- File windowDir = new File(stateDir, storeName);
+ final File windowDir = new File(stateDir, storeName);
- return new HashSet<>(Arrays.asList(windowDir.list()));
+ return Utils.mkSet(Objects.requireNonNull(windowDir.list()));
}
private byte[] serializeValue(final long value) {
@@ -383,14 +428,14 @@ public class RocksDBSegmentedBytesStoreTest {
final KeyValue<Bytes, byte[]> next = iterator.next();
if (schema instanceof WindowKeySchema) {
final KeyValue<Windowed<String>, Long> deserialized =
KeyValue.pair(
- WindowKeySchema.fromStoreKey(next.key.get(),
windowSizeForTimeWindow, stateSerdes),
- stateSerdes.valueDeserializer().deserialize("dummy",
next.value)
+ WindowKeySchema.fromStoreKey(next.key.get(),
windowSizeForTimeWindow, stateSerdes),
+ stateSerdes.valueDeserializer().deserialize("dummy",
next.value)
);
results.add(deserialized);
} else {
final KeyValue<Windowed<String>, Long> deserialized =
KeyValue.pair(
- SessionKeySchema.from(next.key.get(),
stateSerdes.keyDeserializer(), "dummy"),
- stateSerdes.valueDeserializer().deserialize("dummy",
next.value)
+ SessionKeySchema.from(next.key.get(),
stateSerdes.keyDeserializer(), "dummy"),
+ stateSerdes.valueDeserializer().deserialize("dummy",
next.value)
);
results.add(deserialized);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index ac481a7..b0057e5 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -68,9 +68,9 @@ public class RocksDBWindowStoreTest {
private final int numSegments = 3;
private final long windowSize = 3L;
- private final String windowName = "window";
- private final long segmentInterval = 60_000;
+ private final long segmentInterval = 600L;
private final long retentionPeriod = segmentInterval * (numSegments - 1);
+ private final String windowName = "window";
private final Segments segments = new Segments(windowName,
retentionPeriod, segmentInterval);
private final StateSerdes<Integer, String> serdes = new StateSerdes<>("",
Serdes.Integer(), Serdes.String());
@@ -145,8 +145,8 @@ public class RocksDBWindowStoreTest {
windowStore.put(1, "four");
// should only have 2 values as the first segment is no longer open
- assertEquals(new KeyValue<>(60000L, "two"), iterator.next());
- assertEquals(new KeyValue<>(120000L, "three"), iterator.next());
+ assertEquals(new KeyValue<>(segmentInterval, "two"), iterator.next());
+ assertEquals(new KeyValue<>(2 * segmentInterval, "three"),
iterator.next());
assertFalse(iterator.hasNext());
}
@@ -639,7 +639,7 @@ public class RocksDBWindowStoreTest {
segmentDirs(baseDir)
);
- setCurrentTime(59999);
+ setCurrentTime(segmentInterval - 1);
windowStore.put(0, "v");
windowStore.put(0, "v");
assertEquals(
@@ -647,7 +647,7 @@ public class RocksDBWindowStoreTest {
segmentDirs(baseDir)
);
- setCurrentTime(60000);
+ setCurrentTime(segmentInterval);
windowStore.put(0, "v");
assertEquals(
Utils.mkSet(segments.segmentName(0L), segments.segmentName(1L)),
@@ -657,7 +657,7 @@ public class RocksDBWindowStoreTest {
WindowStoreIterator iter;
int fetchedCount;
- iter = windowStore.fetch(0, 0L, 240000L);
+ iter = windowStore.fetch(0, 0L, segmentInterval * 4);
fetchedCount = 0;
while (iter.hasNext()) {
iter.next();
@@ -670,10 +670,10 @@ public class RocksDBWindowStoreTest {
segmentDirs(baseDir)
);
- setCurrentTime(180000);
+ setCurrentTime(segmentInterval * 3);
windowStore.put(0, "v");
- iter = windowStore.fetch(0, 0L, 240000L);
+ iter = windowStore.fetch(0, 0L, segmentInterval * 4);
fetchedCount = 0;
while (iter.hasNext()) {
iter.next();
@@ -686,10 +686,10 @@ public class RocksDBWindowStoreTest {
segmentDirs(baseDir)
);
- setCurrentTime(300000);
+ setCurrentTime(segmentInterval * 5);
windowStore.put(0, "v");
- iter = windowStore.fetch(0, 240000L, 1000000L);
+ iter = windowStore.fetch(0, segmentInterval * 4, segmentInterval * 10);
fetchedCount = 0;
while (iter.hasNext()) {
iter.next();
@@ -847,9 +847,9 @@ public class RocksDBWindowStoreTest {
windowStore.init(context, windowStore);
- final Bytes key1 = Bytes.wrap(new byte[]{0});
- final Bytes key2 = Bytes.wrap(new byte[]{0, 0});
- final Bytes key3 = Bytes.wrap(new byte[]{0, 0, 0});
+ final Bytes key1 = Bytes.wrap(new byte[] {0});
+ final Bytes key2 = Bytes.wrap(new byte[] {0, 0});
+ final Bytes key3 = Bytes.wrap(new byte[] {0, 0, 0});
windowStore.put(key1, "1", 0);
windowStore.put(key2, "2", 0);
windowStore.put(key3, "3", 0);
@@ -924,11 +924,7 @@ public class RocksDBWindowStoreTest {
final Integer key = WindowKeySchema.extractStoreKey(entry.key,
serdes);
final String value = entry.value == null ? null :
serdes.valueFrom(entry.value);
- Set<String> entries = entriesByKey.get(key);
- if (entries == null) {
- entries = new HashSet<>();
- entriesByKey.put(key, entries);
- }
+ final Set<String> entries = entriesByKey.computeIfAbsent(key, k ->
new HashSet<>());
entries.add(value + "@" + (timestamp - startTime));
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
index 1fc0853..efed24f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
@@ -44,22 +44,24 @@ import static org.junit.Assert.assertTrue;
public class SegmentsTest {
private static final int NUM_SEGMENTS = 5;
+ private static final long SEGMENT_INTERVAL = 100L;
+ private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
private InternalMockProcessorContext context;
private Segments segments;
- private final long segmentInterval = 60_000L;
private File stateDirectory;
- private String storeName = "test";
- private final int retentionPeriod = 4 * 60 * 1000;
+ private final String storeName = "test";
@Before
public void createContext() {
stateDirectory = TestUtils.tempDirectory();
- context = new InternalMockProcessorContext(stateDirectory,
- Serdes.String(),
- Serdes.Long(),
- new NoOpRecordCollector(),
- new ThreadCache(new
LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
- segments = new Segments(storeName, retentionPeriod, segmentInterval);
+ context = new InternalMockProcessorContext(
+ stateDirectory,
+ Serdes.String(),
+ Serdes.Long(),
+ new NoOpRecordCollector(),
+ new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics()))
+ );
+ segments = new Segments(storeName, RETENTION_PERIOD, SEGMENT_INTERVAL);
}
@After
@@ -70,24 +72,24 @@ public class SegmentsTest {
@Test
public void shouldGetSegmentIdsFromTimestamp() {
assertEquals(0, segments.segmentId(0));
- assertEquals(1, segments.segmentId(60000));
- assertEquals(2, segments.segmentId(120000));
- assertEquals(3, segments.segmentId(180000));
+ assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
+ assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
+ assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
}
@Test
public void shouldBaseSegmentIntervalOnRetentionAndNumSegments() {
- final Segments segments = new Segments("test", 8 * 60 * 1000, 120_000);
+ final Segments segments = new Segments("test", 8 * SEGMENT_INTERVAL, 2
* SEGMENT_INTERVAL);
assertEquals(0, segments.segmentId(0));
- assertEquals(0, segments.segmentId(60000));
- assertEquals(1, segments.segmentId(120000));
+ assertEquals(0, segments.segmentId(SEGMENT_INTERVAL));
+ assertEquals(1, segments.segmentId(2 * SEGMENT_INTERVAL));
}
@Test
- public void shouldGetSegmentNameFromId() throws Exception {
+ public void shouldGetSegmentNameFromId() {
assertEquals("test.0", segments.segmentName(0));
- assertEquals("test." + segmentInterval, segments.segmentName(1));
- assertEquals("test." + 2 * segmentInterval, segments.segmentName(2));
+ assertEquals("test." + SEGMENT_INTERVAL, segments.segmentName(1));
+ assertEquals("test." + 2 * SEGMENT_INTERVAL, segments.segmentName(2));
}
@Test
@@ -96,11 +98,11 @@ public class SegmentsTest {
final Segment segment2 = segments.getOrCreateSegmentIfLive(1, context);
final Segment segment3 = segments.getOrCreateSegmentIfLive(2, context);
assertTrue(new File(context.stateDir(), "test/test.0").isDirectory());
- assertTrue(new File(context.stateDir(), "test/test." +
segmentInterval).isDirectory());
- assertTrue(new File(context.stateDir(), "test/test." + 2 *
segmentInterval).isDirectory());
- assertEquals(true, segment1.isOpen());
- assertEquals(true, segment2.isOpen());
- assertEquals(true, segment3.isOpen());
+ assertTrue(new File(context.stateDir(), "test/test." +
SEGMENT_INTERVAL).isDirectory());
+ assertTrue(new File(context.stateDir(), "test/test." + 2 *
SEGMENT_INTERVAL).isDirectory());
+ assertTrue(segment1.isOpen());
+ assertTrue(segment2.isOpen());
+ assertTrue(segment3.isOpen());
}
@Test
@@ -114,14 +116,14 @@ public class SegmentsTest {
public void shouldCleanupSegmentsThatHaveExpired() {
final Segment segment1 = segments.getOrCreateSegmentIfLive(0, context);
final Segment segment2 = segments.getOrCreateSegmentIfLive(1, context);
- context.setStreamTime(segmentInterval * 7);
+ context.setStreamTime(SEGMENT_INTERVAL * 7);
final Segment segment3 = segments.getOrCreateSegmentIfLive(7, context);
assertFalse(segment1.isOpen());
assertFalse(segment2.isOpen());
assertTrue(segment3.isOpen());
assertFalse(new File(context.stateDir(), "test/test.0").exists());
- assertFalse(new File(context.stateDir(), "test/test." +
segmentInterval).exists());
- assertTrue(new File(context.stateDir(), "test/test." + 7 *
segmentInterval).exists());
+ assertFalse(new File(context.stateDir(), "test/test." +
SEGMENT_INTERVAL).exists());
+ assertTrue(new File(context.stateDir(), "test/test." + 7 *
SEGMENT_INTERVAL).exists());
}
@Test
@@ -151,6 +153,7 @@ public class SegmentsTest {
@Test
public void shouldOpenExistingSegments() {
+ segments = new Segments("test", 4, 1);
segments.getOrCreateSegmentIfLive(0, context);
segments.getOrCreateSegmentIfLive(1, context);
segments.getOrCreateSegmentIfLive(2, context);
@@ -159,7 +162,7 @@ public class SegmentsTest {
// close existing.
segments.close();
- segments = new Segments("test", 4 * 60 * 1000, 60_000);
+ segments = new Segments("test", 4, 1);
segments.openExisting(context);
assertTrue(segments.getSegmentForTimestamp(0).isOpen());
@@ -182,7 +185,7 @@ public class SegmentsTest {
segments.getOrCreateSegmentIfLive(3, context);
segments.getOrCreateSegmentIfLive(4, context);
- final List<Segment> segments = this.segments.segments(0, 2 * 60 *
1000);
+ final List<Segment> segments = this.segments.segments(0, 2 *
SEGMENT_INTERVAL);
assertEquals(3, segments.size());
assertEquals(0, segments.get(0).id);
assertEquals(1, segments.get(1).id);
@@ -190,14 +193,14 @@ public class SegmentsTest {
}
@Test
- public void shouldGetSegmentsWithinTimeRangeOutOfOrder() throws Exception {
+ public void shouldGetSegmentsWithinTimeRangeOutOfOrder() {
updateStreamTimeAndCreateSegment(4);
updateStreamTimeAndCreateSegment(2);
updateStreamTimeAndCreateSegment(0);
updateStreamTimeAndCreateSegment(1);
updateStreamTimeAndCreateSegment(3);
- final List<Segment> segments = this.segments.segments(0, 2 * 60 *
1000);
+ final List<Segment> segments = this.segments.segments(0, 2 *
SEGMENT_INTERVAL);
assertEquals(3, segments.size());
assertEquals(0, segments.get(0).id);
assertEquals(1, segments.get(1).id);
@@ -241,14 +244,19 @@ public class SegmentsTest {
}
private void updateStreamTimeAndCreateSegment(final int segment) {
- context.setStreamTime(segmentInterval * segment);
+ context.setStreamTime(SEGMENT_INTERVAL * segment);
segments.getOrCreateSegmentIfLive(segment, context);
}
@Test
public void shouldUpdateSegmentFileNameFromOldDateFormatToNewFormat()
throws Exception {
+ final long segmentInterval = 60_000L; // the old segment file's naming
system maxes out at 1 minute granularity.
+
+ segments = new Segments(storeName, NUM_SEGMENTS * segmentInterval,
segmentInterval);
+
final String storeDirectoryPath = stateDirectory.getAbsolutePath() +
File.separator + storeName;
final File storeDirectory = new File(storeDirectoryPath);
+ //noinspection ResultOfMethodCallIgnored
storeDirectory.mkdirs();
final SimpleDateFormat formatter = new
SimpleDateFormat("yyyyMMddHHmm");
@@ -256,13 +264,15 @@ public class SegmentsTest {
for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
final File oldSegment = new File(storeDirectoryPath +
File.separator + storeName + "-" + formatter.format(new Date(segmentId *
segmentInterval)));
+ //noinspection ResultOfMethodCallIgnored
oldSegment.createNewFile();
}
segments.openExisting(context);
for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
- final File newSegment = new File(storeDirectoryPath +
File.separator + storeName + "." + segmentId * (retentionPeriod / (NUM_SEGMENTS
- 1)));
+ final String segmentName = storeName + "." + (long) segmentId *
segmentInterval;
+ final File newSegment = new File(storeDirectoryPath +
File.separator + segmentName);
assertTrue(newSegment.exists());
}
}
@@ -271,17 +281,19 @@ public class SegmentsTest {
public void shouldUpdateSegmentFileNameFromOldColonFormatToNewFormat()
throws Exception {
final String storeDirectoryPath = stateDirectory.getAbsolutePath() +
File.separator + storeName;
final File storeDirectory = new File(storeDirectoryPath);
+ //noinspection ResultOfMethodCallIgnored
storeDirectory.mkdirs();
for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
- final File oldSegment = new File(storeDirectoryPath +
File.separator + storeName + ":" + segmentId * (retentionPeriod / (NUM_SEGMENTS
- 1)));
+ final File oldSegment = new File(storeDirectoryPath +
File.separator + storeName + ":" + segmentId * (RETENTION_PERIOD /
(NUM_SEGMENTS - 1)));
+ //noinspection ResultOfMethodCallIgnored
oldSegment.createNewFile();
}
segments.openExisting(context);
for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
- final File newSegment = new File(storeDirectoryPath +
File.separator + storeName + "." + segmentId * (retentionPeriod / (NUM_SEGMENTS
- 1)));
+ final File newSegment = new File(storeDirectoryPath +
File.separator + storeName + "." + segmentId * (RETENTION_PERIOD /
(NUM_SEGMENTS - 1)));
assertTrue(newSegment.exists());
}
}
@@ -292,6 +304,7 @@ public class SegmentsTest {
segments.close();
assertThat(segments.getSegmentForTimestamp(0), is(nullValue()));
}
+
private void verifyCorrectSegments(final long first, final int
numSegments) {
final List<Segment> result = this.segments.segments(0, Long.MAX_VALUE);
assertEquals(numSegments, result.size());