This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 88823c6 MINOR: cleanup some state store code (#5656)
88823c6 is described below
commit 88823c6016ea2e306340938994d9e122abf3c6c0
Author: Matthias J. Sax <[email protected]>
AuthorDate: Tue Sep 18 14:19:41 2018 -0700
MINOR: cleanup some state store code (#5656)
Reviewers: John Roesler <[email protected]>, Bill Bejeck
<[email protected]>, Guozhang Wang <[email protected]>
---
.../apache/kafka/streams/state/KeyValueStore.java | 33 +++++----
.../kafka/streams/state/QueryableStoreType.java | 21 +++---
.../kafka/streams/state/ReadOnlyKeyValueStore.java | 5 +-
.../kafka/streams/state/ReadOnlyWindowStore.java | 4 +-
.../kafka/streams/state/RocksDBConfigSetter.java | 4 +-
.../streams/state/SessionBytesStoreSupplier.java | 14 ++--
.../apache/kafka/streams/state/StoreBuilder.java | 1 -
.../streams/state/internals/RocksDBStore.java | 79 ++++++++--------------
.../streams/state/internals/RocksDBStoreTest.java | 48 ++++++-------
9 files changed, 94 insertions(+), 115 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
index 3685229..b104ad4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
@@ -30,42 +30,41 @@ import java.util.List;
public interface KeyValueStore<K, V> extends StateStore,
ReadOnlyKeyValueStore<K, V> {
/**
- * Update the value associated with this key
+ * Update the value associated with this key.
*
* @param key The key to associate the value to
- * @param value The value to update, it can be null;
- * if the serialized bytes are also null it is interpreted as
deletes
- * @throws NullPointerException If null is used for key.
+ * @param value The value to update, it can be {@code null};
+ * if the serialized bytes are also {@code null} it is
interpreted as deletes
+ * @throws NullPointerException If {@code null} is used for key.
*/
void put(K key, V value);
/**
- * Update the value associated with this key, unless a value
- * is already associated with the key
+ * Update the value associated with this key, unless a value is already
associated with the key.
*
* @param key The key to associate the value to
- * @param value The value to update, it can be null;
- * if the serialized bytes are also null it is interpreted as
deletes
- * @return The old value or null if there is no such key.
- * @throws NullPointerException If null is used for key.
+ * @param value The value to update, it can be {@code null};
+ * if the serialized bytes are also {@code null} it is
interpreted as deletes
+ * @return The old value or {@code null} if there is no such key.
+ * @throws NullPointerException If {@code null} is used for key.
*/
V putIfAbsent(K key, V value);
/**
- * Update all the given key/value pairs
+ * Update all the given key/value pairs.
*
* @param entries A list of entries to put into the store;
- * if the serialized bytes are also null it is interpreted as
deletes
- * @throws NullPointerException If null is used for key.
+ * if the serialized bytes are also {@code null} it is
interpreted as deletes
+ * @throws NullPointerException If {@code null} is used for key.
*/
void putAll(List<KeyValue<K, V>> entries);
/**
- * Delete the value from the store (if there is one)
+ * Delete the value from the store (if there is one).
*
* @param key The key
- * @return The old value or null if there is no such key.
- * @throws NullPointerException If null is used for key.
+ * @return The old value or {@code null} if there is no such key.
+ * @throws NullPointerException If {@code null} is used for key.
*/
V delete(K key);
-}
+}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java
b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java
index 11b849b..6ba6672 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java
@@ -16,33 +16,34 @@
*/
package org.apache.kafka.streams.state;
+import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.internals.StateStoreProvider;
/**
- * Used to enable querying of custom {@link StateStore} types via the
- * {@link org.apache.kafka.streams.KafkaStreams}
- * API.
- * @see QueryableStoreTypes
+ * Used to enable querying of custom {@link StateStore} types via the {@link
KafkaStreams} API.
*
- * @param <T> The store type
+ * @param <T> The store type
+ * @see QueryableStoreTypes
*/
public interface QueryableStoreType<T> {
/**
* Called when searching for {@link StateStore}s to see if they
- * match the type expected by implementors of this interface
+ * match the type expected by implementors of this interface.
+ *
* @param stateStore The stateStore
* @return true if it is a match
*/
boolean accepts(final StateStore stateStore);
/**
- * Create an instance of T (usually a facade) that developers can use
- * to query the underlying {@link StateStore}s
+ * Create an instance of {@code T} (usually a facade) that developers can
use
+ * to query the underlying {@link StateStore}s.
+ *
* @param storeProvider provides access to all the underlying
StateStore instances
* @param storeName The name of the Store
- * @return T usually a read-only interface over a StateStore @see {@link
QueryableStoreTypes.KeyValueStoreType}
+ * @return a read-only interface over a {@code StateStore} (cf. {@link
QueryableStoreTypes.KeyValueStoreType})
*/
T create(final StateStoreProvider storeProvider, final String storeName);
-}
+}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
index 8725ebc..9b2f8f5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
@@ -19,9 +19,8 @@ package org.apache.kafka.streams.state;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
/**
- * A key value store that only supports read operations.
- * Implementations should be thread-safe as concurrent reads and writes
- * are expected.
+ * A key-value store that only supports read operations.
+ * Implementations should be thread-safe as concurrent reads and writes are
expected.
*
* Please note that this contract defines the thread-safe read functionality
only; it does not
* guarantee anything about whether the actual instance is writable by another
thread, or
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
index dea759f..0c46fc2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
@@ -21,8 +21,8 @@ import org.apache.kafka.streams.kstream.Windowed;
/**
* A window store that only supports read operations
- * Implementations should be thread-safe as concurrent reads and writes
- * are expected.
+ * Implementations should be thread-safe as concurrent reads and writes are
expected.
+ *
* @param <K> Type of keys
* @param <V> Type of values
*/
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java
b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java
index 1dba933..b65baa5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java
@@ -21,8 +21,8 @@ import org.rocksdb.Options;
import java.util.Map;
/**
- * An interface to that allows developers to customize the RocksDB settings
- * for a given Store. Please read the <a
href="https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide">RocksDB
Tuning Guide</a>.
+ * An interface to that allows developers to customize the RocksDB settings
for a given Store.
+ * Please read the <a
href="https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide">RocksDB
Tuning Guide</a>.
*/
public interface RocksDBConfigSetter {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java
index 6954089..5c7bc25 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java
@@ -19,12 +19,14 @@ package org.apache.kafka.streams.state;
import org.apache.kafka.common.utils.Bytes;
/**
- * A store supplier that can be used to create one or more {@link SessionStore
SessionStore<Bytes, byte[]>} instances of type <Byte, byte[]>.
+ * A store supplier that can be used to create one or more {@link SessionStore
SessionStore<Byte, byte[]>} instances.
*
- * For any stores implementing the {@link SessionStore SessionStore<Bytes,
byte[]>} interface, null value bytes are considered as "not exist". This means:
- *
- * 1. Null value bytes in put operations should be treated as delete.
- * 2. Null value bytes should never be returned in range query results.
+ * For any stores implementing the {@link SessionStore SessionStore<Byte,
byte[]>} interface, {@code null} value
+ * bytes are considered as "not exist". This means:
+ * <ol>
+ * <li>{@code null} value bytes in put operations should be treated as
delete.</li>
+ * <li>{@code null} value bytes should never be returned in range query
results.</li>
+ * </ol>
*/
public interface SessionBytesStoreSupplier extends
StoreSupplier<SessionStore<Bytes, byte[]>> {
@@ -42,4 +44,4 @@ public interface SessionBytesStoreSupplier extends
StoreSupplier<SessionStore<By
* @return retentionPeriod
*/
long retentionPeriod();
-}
+}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/StoreBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/state/StoreBuilder.java
index a930468..430ba27 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/StoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StoreBuilder.java
@@ -61,7 +61,6 @@ public interface StoreBuilder<T extends StateStore> {
*/
T build();
-
/**
* Returns a Map containing any log configs that will be used when
creating the changelog for the {@link StateStore}.
* <p>
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index fbf7df3..bf748fc 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -44,7 +44,6 @@ import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import java.io.File;
-import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Collection;
@@ -69,14 +68,11 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]> {
private static final Pattern SST_FILE_EXTENSION =
Pattern.compile(".*\\.sst");
- private static final int TTL_NOT_USED = -1;
-
private static final CompressionType COMPRESSION_TYPE =
CompressionType.NO_COMPRESSION;
private static final CompactionStyle COMPACTION_STYLE =
CompactionStyle.UNIVERSAL;
private static final long WRITE_BUFFER_SIZE = 16 * 1024 * 1024L;
private static final long BLOCK_CACHE_SIZE = 50 * 1024 * 1024L;
private static final long BLOCK_SIZE = 4096L;
- private static final int TTL_SECONDS = TTL_NOT_USED;
private static final int MAX_WRITE_BUFFERS = 3;
private static final String DB_FILE_DIR = "rocksdb";
@@ -152,10 +148,15 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]> {
options.prepareForBulkLoad();
}
- this.dbDir = new File(new File(context.stateDir(), parentDir),
this.name);
+ dbDir = new File(new File(context.stateDir(), parentDir), name);
try {
- this.db = openDB(this.dbDir, this.options, TTL_SECONDS);
+ try {
+ Files.createDirectories(dbDir.getParentFile().toPath());
+ db = RocksDB.open(options, dbDir.getAbsolutePath());
+ } catch (final RocksDBException e) {
+ throw new ProcessorStateException("Error opening store " +
name + " at location " + dbDir.toString(), e);
+ }
} catch (final IOException e) {
throw new ProcessorStateException(e);
}
@@ -166,30 +167,13 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]> {
public void init(final ProcessorContext context,
final StateStore root) {
// open the DB dir
- this.internalProcessorContext = context;
+ internalProcessorContext = context;
openDB(context);
- this.batchingStateRestoreCallback = new
RocksDBBatchingRestoreCallback(this);
+ batchingStateRestoreCallback = new
RocksDBBatchingRestoreCallback(this);
// value getter should always read directly from rocksDB
// since it is only for values that are already flushed
- context.register(root, this.batchingStateRestoreCallback);
- }
-
- private RocksDB openDB(final File dir,
- final Options options,
- final int ttl) throws IOException {
- try {
- if (ttl == TTL_NOT_USED) {
- Files.createDirectories(dir.getParentFile().toPath());
- return RocksDB.open(options, dir.getAbsolutePath());
- } else {
- throw new UnsupportedOperationException("Change log is not
supported for store " + this.name + " since it is TTL based.");
- // TODO: support TTL with change log?
- // return TtlDB.open(options, dir.toString(), ttl, false);
- }
- } catch (final RocksDBException e) {
- throw new ProcessorStateException("Error opening store " +
this.name + " at location " + dir.toString(), e);
- }
+ context.register(root, batchingStateRestoreCallback);
}
// visible for testing
@@ -199,7 +183,7 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]> {
@Override
public String name() {
- return this.name;
+ return name;
}
@Override
@@ -220,16 +204,16 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]> {
private void validateStoreOpen() {
if (!open) {
- throw new InvalidStateStoreException("Store " + this.name + " is
currently closed");
+ throw new InvalidStateStoreException("Store " + name + " is
currently closed");
}
}
private byte[] getInternal(final byte[] rawKey) {
try {
- return this.db.get(rawKey);
+ return db.get(rawKey);
} catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted
message is thrown from wrapping stores.
- throw new ProcessorStateException("Error while getting value for
key %s from store " + this.name, e);
+ throw new ProcessorStateException("Error while getting value for
key %s from store " + name, e);
}
}
@@ -238,18 +222,13 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]> {
if (prepareForBulkload) {
// if the store is not empty, we need to compact to get around the
num.levels check
// for bulk loading
- final String[] sstFileNames = dbDir.list(new FilenameFilter() {
- @Override
- public boolean accept(final File dir, final String name) {
- return SST_FILE_EXTENSION.matcher(name).matches();
- }
- });
+ final String[] sstFileNames = dbDir.list((dir, name) ->
SST_FILE_EXTENSION.matcher(name).matches());
if (sstFileNames != null && sstFileNames.length > 0) {
try {
- this.db.compactRange(true, 1, 0);
+ db.compactRange(true, 1, 0);
} catch (final RocksDBException e) {
- throw new ProcessorStateException("Error while range
compacting during restoring store " + this.name, e);
+ throw new ProcessorStateException("Error while range
compacting during restoring store " + name, e);
}
}
}
@@ -279,7 +258,7 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]> {
return originalValue;
}
- void restoreAllInternal(final Collection<KeyValue<byte[], byte[]>>
records) {
+ private void restoreAllInternal(final Collection<KeyValue<byte[], byte[]>>
records) {
try (final WriteBatch batch = new WriteBatch()) {
for (final KeyValue<byte[], byte[]> record : records) {
if (record.value == null) {
@@ -290,7 +269,7 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]> {
}
write(batch);
} catch (final RocksDBException e) {
- throw new ProcessorStateException("Error restoring batch to store
" + this.name, e);
+ throw new ProcessorStateException("Error restoring batch to store
" + name, e);
}
}
@@ -301,14 +280,14 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]> {
db.delete(wOptions, rawKey);
} catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted
message is thrown from wrapping stores.
- throw new ProcessorStateException("Error while removing key %s
from store " + this.name, e);
+ throw new ProcessorStateException("Error while removing key %s
from store " + name, e);
}
} else {
try {
db.put(wOptions, rawKey, rawValue);
} catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted
message is thrown from wrapping stores.
- throw new ProcessorStateException("Error while putting key %s
value %s into store " + this.name, e);
+ throw new ProcessorStateException("Error while putting key %s
value %s into store " + name, e);
}
}
}
@@ -330,7 +309,7 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]> {
}
write(batch);
} catch (final RocksDBException e) {
- throw new ProcessorStateException("Error while batch writing to
store " + this.name, e);
+ throw new ProcessorStateException("Error while batch writing to
store " + name, e);
}
}
@@ -384,9 +363,9 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]> {
validateStoreOpen();
final long value;
try {
- value = this.db.getLongProperty("rocksdb.estimate-num-keys");
+ value = db.getLongProperty("rocksdb.estimate-num-keys");
} catch (final RocksDBException e) {
- throw new ProcessorStateException("Error fetching property from
store " + this.name, e);
+ throw new ProcessorStateException("Error fetching property from
store " + name, e);
}
if (isOverflowing(value)) {
return Long.MAX_VALUE;
@@ -415,7 +394,7 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]> {
try {
db.flush(fOptions);
} catch (final RocksDBException e) {
- throw new ProcessorStateException("Error while executing flush
from store " + this.name, e);
+ throw new ProcessorStateException("Error while executing flush
from store " + name, e);
}
}
@@ -480,7 +459,7 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]> {
if (!iter.isValid()) {
return allDone();
} else {
- next = this.getKeyValue();
+ next = getKeyValue();
iter.next();
return next;
}
@@ -524,8 +503,8 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]> {
final Bytes to) {
super(storeName, iter);
iter.seek(from.get());
- this.rawToKey = to.get();
- if (this.rawToKey == null) {
+ rawToKey = to.get();
+ if (rawToKey == null) {
throw new NullPointerException("RocksDBRangeIterator: RawToKey
is null for key " + to);
}
}
@@ -537,7 +516,7 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]> {
if (next == null) {
return allDone();
} else {
- if (comparator.compare(next.key.get(), this.rawToKey) <= 0)
+ if (comparator.compare(next.key.get(), rawToKey) <= 0)
return next;
else
return allDone();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 5ae32eb..b77b02d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -39,7 +39,6 @@ import org.rocksdb.Options;
import java.io.File;
import java.io.IOException;
-import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -47,6 +46,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -55,8 +55,8 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class RocksDBStoreTest {
- private Serializer<String> stringSerializer = new StringSerializer();
- private Deserializer<String> stringDeserializer = new StringDeserializer();
+ private final Serializer<String> stringSerializer = new StringSerializer();
+ private final Deserializer<String> stringDeserializer = new
StringDeserializer();
private RocksDBStore rocksDBStore;
private InternalMockProcessorContext context;
private File dir;
@@ -94,7 +94,7 @@ public class RocksDBStoreTest {
}
@Test
- public void
shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles() throws
Exception {
+ public void
shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles() {
rocksDBStore.init(context, rocksDBStore);
final String message = "how can a 4 ounce bird carry a 2lb coconut";
@@ -106,8 +106,8 @@ public class RocksDBStoreTest {
final List<KeyValue<byte[], byte[]>> restoreBytes = new ArrayList<>();
- final byte[] restoredKey = "restoredKey".getBytes("UTF-8");
- final byte[] restoredValue = "restoredValue".getBytes("UTF-8");
+ final byte[] restoredKey = "restoredKey".getBytes(UTF_8);
+ final byte[] restoredValue = "restoredValue".getBytes(UTF_8);
restoreBytes.add(KeyValue.pair(restoredKey, restoredValue));
context.restore("test", restoreBytes);
@@ -191,7 +191,7 @@ public class RocksDBStoreTest {
}
@Test
- public void shouldTogglePrepareForBulkloadSettingWhenPrexistingSstFiles()
throws Exception {
+ public void shouldTogglePrepareForBulkloadSettingWhenPrexistingSstFiles() {
final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries();
rocksDBStore.init(context, rocksDBStore);
@@ -208,7 +208,7 @@ public class RocksDBStoreTest {
}
@Test
- public void shouldRestoreAll() throws Exception {
+ public void shouldRestoreAll() {
final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries();
rocksDBStore.init(context, rocksDBStore);
@@ -246,9 +246,9 @@ public class RocksDBStoreTest {
}
@Test
- public void shouldHandleDeletesOnRestoreAll() throws Exception {
+ public void shouldHandleDeletesOnRestoreAll() {
final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries();
- entries.add(new KeyValue<>("1".getBytes("UTF-8"), null));
+ entries.add(new KeyValue<>("1".getBytes(UTF_8), null));
rocksDBStore.init(context, rocksDBStore);
context.restore(rocksDBStore.name(), entries);
@@ -264,15 +264,15 @@ public class RocksDBStoreTest {
}
@Test
- public void shouldHandleDeletesAndPutbackOnRestoreAll() throws Exception {
+ public void shouldHandleDeletesAndPutbackOnRestoreAll() {
final List<KeyValue<byte[], byte[]>> entries = new ArrayList<>();
- entries.add(new KeyValue<>("1".getBytes("UTF-8"),
"a".getBytes("UTF-8")));
- entries.add(new KeyValue<>("2".getBytes("UTF-8"),
"b".getBytes("UTF-8")));
+ entries.add(new KeyValue<>("1".getBytes(UTF_8), "a".getBytes(UTF_8)));
+ entries.add(new KeyValue<>("2".getBytes(UTF_8), "b".getBytes(UTF_8)));
// this will be deleted
- entries.add(new KeyValue<>("1".getBytes("UTF-8"), null));
- entries.add(new KeyValue<>("3".getBytes("UTF-8"),
"c".getBytes("UTF-8")));
+ entries.add(new KeyValue<>("1".getBytes(UTF_8), null));
+ entries.add(new KeyValue<>("3".getBytes(UTF_8), "c".getBytes(UTF_8)));
// this will restore key "1" as WriteBatch applies updates in order
- entries.add(new KeyValue<>("1".getBytes("UTF-8"),
"restored".getBytes("UTF-8")));
+ entries.add(new KeyValue<>("1".getBytes(UTF_8),
"restored".getBytes(UTF_8)));
rocksDBStore.init(context, rocksDBStore);
context.restore(rocksDBStore.name(), entries);
@@ -304,7 +304,7 @@ public class RocksDBStoreTest {
}
@Test
- public void shouldRestoreThenDeleteOnRestoreAll() throws Exception {
+ public void shouldRestoreThenDeleteOnRestoreAll() {
final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries();
rocksDBStore.init(context, rocksDBStore);
@@ -329,9 +329,9 @@ public class RocksDBStoreTest {
entries.clear();
- entries.add(new KeyValue<>("2".getBytes("UTF-8"),
"b".getBytes("UTF-8")));
- entries.add(new KeyValue<>("3".getBytes("UTF-8"),
"c".getBytes("UTF-8")));
- entries.add(new KeyValue<>("1".getBytes("UTF-8"), null));
+ entries.add(new KeyValue<>("2".getBytes(UTF_8), "b".getBytes(UTF_8)));
+ entries.add(new KeyValue<>("3".getBytes(UTF_8), "c".getBytes(UTF_8)));
+ entries.add(new KeyValue<>("1".getBytes(UTF_8), null));
context.restore(rocksDBStore.name(), entries);
@@ -423,11 +423,11 @@ public class RocksDBStoreTest {
}
}
- private List<KeyValue<byte[], byte[]>> getKeyValueEntries() throws
UnsupportedEncodingException {
+ private List<KeyValue<byte[], byte[]>> getKeyValueEntries() {
final List<KeyValue<byte[], byte[]>> entries = new ArrayList<>();
- entries.add(new KeyValue<>("1".getBytes("UTF-8"),
"a".getBytes("UTF-8")));
- entries.add(new KeyValue<>("2".getBytes("UTF-8"),
"b".getBytes("UTF-8")));
- entries.add(new KeyValue<>("3".getBytes("UTF-8"),
"c".getBytes("UTF-8")));
+ entries.add(new KeyValue<>("1".getBytes(UTF_8), "a".getBytes(UTF_8)));
+ entries.add(new KeyValue<>("2".getBytes(UTF_8), "b".getBytes(UTF_8)));
+ entries.add(new KeyValue<>("3".getBytes(UTF_8), "c".getBytes(UTF_8)));
return entries;
}
}