This is an automated email from the ASF dual-hosted git repository. toulmean pushed a commit to branch support_kv_store in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
commit 991aefd5a97af667ce946f0701d44ed06702e27f Author: Antoine Toulme <[email protected]> AuthorDate: Wed Feb 12 15:03:36 2020 -0800 Make key-value stores generic --- dependency-versions.gradle | 1 + .../tuweni/eth/repository/BlockchainRepository.kt | 16 +-- kv/build.gradle | 1 + .../apache/tuweni/kv/EntityManagerKeyValueStore.kt | 86 +++++++++++++++ .../apache/tuweni/kv/InfinispanKeyValueStore.kt | 16 +-- .../kotlin/org/apache/tuweni/kv/KeyValueStore.kt | 15 ++- .../org/apache/tuweni/kv/LevelDBKeyValueStore.kt | 67 +++++++++--- .../org/apache/tuweni/kv/MapDBKeyValueStore.kt | 33 ++++-- .../org/apache/tuweni/kv/MapKeyValueStore.kt | 17 ++- .../org/apache/tuweni/kv/RedisKeyValueStore.kt | 115 ++++++++++++++++++--- .../org/apache/tuweni/kv/RocksDBKeyValueStore.kt | 66 +++++++++--- .../org/apache/tuweni/kv/SQLKeyValueStore.kt | 74 +++++++++---- .../org/apache/tuweni/kv/KeyValueStoreTest.java | 23 ++++- .../apache/tuweni/kv/RedisKeyValueStoreTest.java | 19 +++- .../org/apache/tuweni/kv/KeyValueStoreSpec.kt | 72 +++++++++++-- 15 files changed, 505 insertions(+), 116 deletions(-) diff --git a/dependency-versions.gradle b/dependency-versions.gradle index 9c3d571..fa8d1cd 100644 --- a/dependency-versions.gradle +++ b/dependency-versions.gradle @@ -29,6 +29,7 @@ dependencyManagement { dependency('info.picocli:picocli:4.0.0-alpha-2') dependency('io.lettuce:lettuce-core:5.1.3.RELEASE') dependency('io.vertx:vertx-core:3.6.2') + dependency('javax.persistence:javax.persistence-api:2.2') dependencySet(group: 'org.antlr', version: '4.7.1') { entry 'antlr4' entry 'antlr4-runtime' diff --git a/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt b/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt index b7c478e..9c4e30a 100644 --- a/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt +++ b/eth-repository/src/main/kotlin/org/apache/tuweni/eth/repository/BlockchainRepository.kt @@ -41,10 +41,10 @@ class BlockchainRepository * @param blockchainIndex the blockchain index to index values */ ( - private val chainMetadata: KeyValueStore, - private val blockBodyStore: KeyValueStore, - private val blockHeaderStore: KeyValueStore, - private val transactionReceiptsStore: KeyValueStore, + private val chainMetadata: KeyValueStore<Bytes, Bytes>, + private val blockBodyStore: KeyValueStore<Bytes, Bytes>, + private val blockHeaderStore: KeyValueStore<Bytes, Bytes>, + private val transactionReceiptsStore: KeyValueStore<Bytes, Bytes>, private val blockchainIndex: BlockchainIndex ) { @@ -58,10 +58,10 @@ class BlockchainRepository * @return a new blockchain repository made from the metadata passed in parameter. */ suspend fun init( - blockBodyStore: KeyValueStore, - blockHeaderStore: KeyValueStore, - chainMetadata: KeyValueStore, - transactionReceiptsStore: KeyValueStore, + blockBodyStore: KeyValueStore<Bytes, Bytes>, + blockHeaderStore: KeyValueStore<Bytes, Bytes>, + chainMetadata: KeyValueStore<Bytes, Bytes>, + transactionReceiptsStore: KeyValueStore<Bytes, Bytes>, blockchainIndex: BlockchainIndex, genesisBlock: Block ): BlockchainRepository { diff --git a/kv/build.gradle b/kv/build.gradle index 0f20ad0..d05b5c2 100644 --- a/kv/build.gradle +++ b/kv/build.gradle @@ -19,6 +19,7 @@ dependencies { compile 'org.jetbrains.kotlinx:kotlinx-coroutines-guava' compile 'org.jetbrains.kotlinx:kotlinx-coroutines-jdk8' compile 'org.jetbrains.kotlin:kotlin-stdlib-jdk8' + compile 'javax.persistence:javax.persistence-api' compileOnly 'com.jolbox:bonecp' compileOnly 'io.lettuce:lettuce-core' compileOnly 'org.fusesource.leveldbjni:leveldbjni-all' diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/EntityManagerKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/EntityManagerKeyValueStore.kt new file mode 100644 index 0000000..8b653ad --- /dev/null +++ b/kv/src/main/kotlin/org/apache/tuweni/kv/EntityManagerKeyValueStore.kt @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tuweni.kv + +import kotlin.jvm.Throws +import kotlinx.coroutines.Dispatchers + +import java.io.IOException +import java.util.function.Function +import java.util.function.Supplier +import java.util.stream.Stream +import javax.persistence.EntityManager +import kotlin.coroutines.CoroutineContext + +class EntityManagerKeyValueStore<K, V> + @Throws(IOException::class) + constructor( + private val entityManagerProvider: () -> EntityManager, + private val entityClass: Class<V>, + private val idAccessor: (V) -> K, + override val coroutineContext: CoroutineContext = Dispatchers.IO + ) : KeyValueStore<K, V> { + + companion object { + /** + * Open a relational database backed key-value store using a JPA entity manager. + * + * @param entityManagerProvider The supplier of entity manager to operate. + * @return A key-value store. + * @throws IOException If an I/O error occurs. + */ + @JvmStatic + @Throws(IOException::class) + fun <K, V> open( + entityManagerProvider: Supplier<EntityManager>, + entityClass: Class<V>, + idAccessor: Function<V, K> + ) = EntityManagerKeyValueStore<K, V>(entityManagerProvider::get, entityClass, idAccessor::apply) + } + + override suspend fun get(key: K): V? { + val em = entityManagerProvider() + em.transaction.begin() + try { + return em.find(entityClass, key) + } finally { + em.transaction.commit() + em.close() + } + } + + override suspend fun put(key: K, value: V) { + val em = entityManagerProvider() + em.transaction.begin() + try { + em.merge(value) + } finally { + em.transaction.commit() + em.close() + } + } + + override suspend fun keys(): Iterable<K> { + val em = entityManagerProvider() + val query = em.createQuery(em.criteriaBuilder.createQuery(entityClass)) + val resultStream: Stream<V> = query.resultStream + return Iterable { resultStream.map(idAccessor).iterator() } + } + + override fun close() { + } +} diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt index 85a8b92..50e353f 100644 --- a/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt +++ b/kv/src/main/kotlin/org/apache/tuweni/kv/InfinispanKeyValueStore.kt @@ -18,7 +18,7 @@ package org.apache.tuweni.kv import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.future.await -import org.apache.tuweni.bytes.Bytes +import org.checkerframework.checker.units.qual.K import org.infinispan.Cache import kotlin.coroutines.CoroutineContext @@ -26,10 +26,10 @@ import kotlin.coroutines.CoroutineContext * A key-value store backed by [Infinispan](https://infinispan.org) * */ -class InfinispanKeyValueStore constructor( - private val cache: Cache<Bytes, Bytes>, +class InfinispanKeyValueStore<K, V> constructor( + private val cache: Cache<K, V>, override val coroutineContext: CoroutineContext = Dispatchers.IO -) : KeyValueStore { +) : KeyValueStore<K, V> { companion object { @@ -40,16 +40,16 @@ class InfinispanKeyValueStore constructor( * @return A key-value store. */ @JvmStatic - fun open(cache: Cache<Bytes, Bytes>) = InfinispanKeyValueStore(cache) + fun <K, V> open(cache: Cache<K, V>) = InfinispanKeyValueStore(cache) } - override suspend fun get(key: Bytes): Bytes? = cache.getAsync(key).await() + override suspend fun get(key: K): V? = cache.getAsync(key).await() - override suspend fun put(key: Bytes, value: Bytes) { + override suspend fun put(key: K, value: V) { cache.putAsync(key, value).await() } - override suspend fun keys(): Iterable<Bytes> = cache.keys + override suspend fun keys(): Iterable<K> = cache.keys /** * The cache is managed outside the scope of this key-value store. diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt index 588eddf..7f2775a 100644 --- a/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt +++ b/kv/src/main/kotlin/org/apache/tuweni/kv/KeyValueStore.kt @@ -17,7 +17,6 @@ package org.apache.tuweni.kv import kotlinx.coroutines.CoroutineScope -import org.apache.tuweni.bytes.Bytes import org.apache.tuweni.concurrent.AsyncCompletion import org.apache.tuweni.concurrent.AsyncResult import org.apache.tuweni.concurrent.coroutines.asyncCompletion @@ -27,7 +26,7 @@ import java.io.Closeable /** * A key-value store. */ -interface KeyValueStore : Closeable, CoroutineScope { +interface KeyValueStore<K, V> : Closeable, CoroutineScope { /** * Retrieves data from the store. @@ -35,7 +34,7 @@ interface KeyValueStore : Closeable, CoroutineScope { * @param key The key for the content. * @return The stored data, or null if no data was stored under the specified key. */ - suspend fun get(key: Bytes): Bytes? + suspend fun get(key: K): V? /** * Retrieves data from the store. @@ -44,7 +43,7 @@ interface KeyValueStore : Closeable, CoroutineScope { * @return An [AsyncResult] that will complete with the stored content, * or an empty optional if no content was available. */ - fun getAsync(key: Bytes): AsyncResult<Bytes?> = asyncResult { get(key) } + fun getAsync(key: K): AsyncResult<V?> = asyncResult { get(key) } /** * Puts data into the store. @@ -52,7 +51,7 @@ interface KeyValueStore : Closeable, CoroutineScope { * @param key The key to associate with the data, for use when retrieving. * @param value The data to store. */ - suspend fun put(key: Bytes, value: Bytes) + suspend fun put(key: K, value: V) /** * Puts data into the store. @@ -64,19 +63,19 @@ interface KeyValueStore : Closeable, CoroutineScope { * @param value The data to store. * @return An [AsyncCompletion] that will complete when the content is stored. */ - fun putAsync(key: Bytes, value: Bytes): AsyncCompletion = asyncCompletion { put(key, value) } + fun putAsync(key: K, value: V): AsyncCompletion = asyncCompletion { put(key, value) } /** * Provides an iterator over the keys of the store. * * @return An [Iterable] allowing to iterate over the set of keys. */ - suspend fun keys(): Iterable<Bytes> + suspend fun keys(): Iterable<K> /** * Provides an iterator over the keys of the store. * * @return An [Iterable] allowing to iterate over the set of keys. */ - fun keysAsync(): AsyncResult<Iterable<Bytes>> = asyncResult { keys() } + fun keysAsync(): AsyncResult<Iterable<K>> = asyncResult { keys() } } diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt index 21ba81d..f0e8985 100644 --- a/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt +++ b/kv/src/main/kotlin/org/apache/tuweni/kv/LevelDBKeyValueStore.kt @@ -25,49 +25,91 @@ import org.iq80.leveldb.Options import java.io.IOException import java.nio.file.Files import java.nio.file.Path +import java.util.function.Function import kotlin.coroutines.CoroutineContext /** * A key-value store backed by LevelDB. * * @param dbPath The path to the levelDB database. + * @param keySerializer the serializer of key objects to bytes + * @param valueSerializer the serializer of value objects to bytes + * @param keyDeserializer the deserializer of keys from bytes + * @param valueDeserializer the deserializer of values from bytes * @param options Options for the levelDB database. * @param coroutineContext The co-routine context for blocking tasks. * @return A key-value store. * @throws IOException If an I/O error occurs. * @constructor Open a LevelDB-backed key-value store. */ -class LevelDBKeyValueStore +class LevelDBKeyValueStore<K, V> @Throws(IOException::class) constructor( dbPath: Path, + private val keySerializer: (K) -> Bytes, + private val valueSerializer: (V) -> Bytes, + private val keyDeserializer: (Bytes) -> K, + private val valueDeserializer: (Bytes) -> V, options: Options = Options().createIfMissing(true).cacheSize((100 * 1048576).toLong()), override val coroutineContext: CoroutineContext = Dispatchers.IO -) : KeyValueStore { +) : KeyValueStore<K, V> { companion object { + /** * Open a LevelDB-backed key-value store. * * @param dbPath The path to the levelDB database. + * @param keySerializer the serializer of key objects to bytes + * @param valueSerializer the serializer of value objects to bytes + * @param keyDeserializer the deserializer of keys from bytes + * @param valueDeserializer the deserializer of values from bytes * @return A key-value store. * @throws IOException If an I/O error occurs. */ @JvmStatic @Throws(IOException::class) - fun open(dbPath: Path) = LevelDBKeyValueStore(dbPath) + fun <K, V> open( + dbPath: Path, + keySerializer: Function<K, Bytes>, + valueSerializer: Function<V, Bytes>, + keyDeserializer: Function<Bytes, K>, + valueDeserializer: Function<Bytes, V> + ): LevelDBKeyValueStore<K, V> = + LevelDBKeyValueStore(dbPath, + keySerializer::apply, + valueSerializer::apply, + keyDeserializer::apply, + valueDeserializer::apply) /** * Open a LevelDB-backed key-value store. * * @param dbPath The path to the levelDB database. + * @param keySerializer the serializer of key objects to bytes + * @param valueSerializer the serializer of value objects to bytes + * @param keyDeserializer the deserializer of keys from bytes + * @param valueDeserializer the deserializer of values from bytes * @param options Options for the levelDB database. * @return A key-value store. * @throws IOException If an I/O error occurs. */ @JvmStatic @Throws(IOException::class) - fun open(dbPath: Path, options: Options) = LevelDBKeyValueStore(dbPath, options) + fun <K, V> open( + dbPath: Path, + keySerializer: Function<K, Bytes>, + valueSerializer: Function<V, Bytes>, + keyDeserializer: Function<Bytes, K>, + valueDeserializer: Function<Bytes, V>, + options: Options + ) = + LevelDBKeyValueStore(dbPath, + keySerializer::apply, + valueSerializer::apply, + keyDeserializer::apply, + valueDeserializer::apply, + options) } private val db: DB @@ -77,27 +119,28 @@ constructor( db = JniDBFactory.factory.open(dbPath.toFile(), options) } - override suspend fun get(key: Bytes): Bytes? { - val rawValue = db[key.toArrayUnsafe()] + override suspend fun get(key: K): V? { + val rawValue = db[keySerializer(key).toArrayUnsafe()] return if (rawValue == null) { null } else { - Bytes.wrap(rawValue) + valueDeserializer(Bytes.wrap(rawValue)) } } - override suspend fun put(key: Bytes, value: Bytes) = db.put(key.toArrayUnsafe(), value.toArrayUnsafe()) + override suspend fun put(key: K, value: V) = db.put(keySerializer(key).toArrayUnsafe(), + valueSerializer(value).toArrayUnsafe()) - private class BytesIterator(val iter: DBIterator) : Iterator<Bytes> { + private class KIterator<K>(val iter: DBIterator, val keyDeserializer: (Bytes) -> K) : Iterator<K> { override fun hasNext(): Boolean = iter.hasNext() - override fun next(): Bytes = Bytes.wrap(iter.next().key) + override fun next(): K = keyDeserializer(Bytes.wrap(iter.next().key)) } - override suspend fun keys(): Iterable<Bytes> { + override suspend fun keys(): Iterable<K> { val iter = db.iterator() iter.seekToFirst() - return Iterable { BytesIterator(iter) } + return Iterable { KIterator(iter, keyDeserializer) } } /** diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt index f5bf128..190108f 100644 --- a/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt +++ b/kv/src/main/kotlin/org/apache/tuweni/kv/MapDBKeyValueStore.kt @@ -32,29 +32,48 @@ import kotlin.coroutines.CoroutineContext * A key-value store backed by a MapDB instance. * * @param dbPath The path to the MapDB database. + * @param keySerializer the serializer of key objects to bytes + * @param valueSerializer the serializer of value objects to bytes + * @param keyDeserializer the deserializer of keys from bytes + * @param valueDeserializer the deserializer of values from bytes * @param coroutineContext The co-routine context for blocking tasks. * @return A key-value store. * @throws IOException If an I/O error occurs. * @constructor Open a MapDB-backed key-value store. */ -class MapDBKeyValueStore +class MapDBKeyValueStore<K, V> @Throws(IOException::class) constructor( dbPath: Path, + private val keySerializer: (K) -> Bytes, + private val valueSerializer: (V) -> Bytes, + private val keyDeserializer: (Bytes) -> K, + private val valueDeserializer: (Bytes?) -> V?, override val coroutineContext: CoroutineContext = Dispatchers.IO -) : KeyValueStore { +) : KeyValueStore<K, V> { companion object { /** * Open a MapDB-backed key-value store. * * @param dbPath The path to the MapDB database. + * @param keySerializer the serializer of key objects to bytes + * @param valueSerializer the serializer of value objects to bytes + * @param keyDeserializer the deserializer of keys from bytes + * @param valueDeserializer the deserializer of values from bytes * @return A key-value store. * @throws IOException If an I/O error occurs. */ @JvmStatic @Throws(IOException::class) - fun open(dbPath: Path) = MapDBKeyValueStore(dbPath) + fun <K, V> open( + dbPath: Path, + keySerializer: (K) -> Bytes, + valueSerializer: (V) -> Bytes, + keyDeserializer: (Bytes) -> K, + valueDeserializer: (Bytes?) -> V? + ) = + MapDBKeyValueStore<K, V>(dbPath, keySerializer, valueSerializer, keyDeserializer, valueDeserializer) } private val db: DB @@ -70,14 +89,14 @@ constructor( ).createOrOpen() } - override suspend fun get(key: Bytes): Bytes? = storageData[key] + override suspend fun get(key: K): V? = valueDeserializer(storageData[keySerializer(key)]) - override suspend fun put(key: Bytes, value: Bytes) { - storageData[key] = value + override suspend fun put(key: K, value: V) { + storageData[keySerializer(key)] = valueSerializer(value) db.commit() } - override suspend fun keys(): Iterable<Bytes> = storageData.keys + override suspend fun keys(): Iterable<K> = storageData.keys.map(keyDeserializer) /** * Closes the underlying MapDB instance. diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt index bc15460..5eaca56 100644 --- a/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt +++ b/kv/src/main/kotlin/org/apache/tuweni/kv/MapKeyValueStore.kt @@ -17,7 +17,6 @@ package org.apache.tuweni.kv import kotlinx.coroutines.Dispatchers -import org.apache.tuweni.bytes.Bytes import kotlin.coroutines.CoroutineContext /** @@ -27,11 +26,11 @@ import kotlin.coroutines.CoroutineContext * @return A key-value store. * @constructor Open an in-memory key-value store. */ -class MapKeyValueStore +class MapKeyValueStore<K, V> constructor( - private val map: MutableMap<Bytes, Bytes> = HashMap(), + private val map: MutableMap<K, V> = HashMap(), override val coroutineContext: CoroutineContext = Dispatchers.IO -) : KeyValueStore { +) : KeyValueStore<K, V> { companion object { /** @@ -42,7 +41,7 @@ constructor( * @return A key-value store. */ @JvmStatic - fun open(): MapKeyValueStore = MapKeyValueStore() + fun <K, V> open() = MapKeyValueStore<K, V>() /** * Open an in-memory key-value store. @@ -51,16 +50,16 @@ constructor( * @return A key-value store. */ @JvmStatic - fun open(map: MutableMap<Bytes, Bytes>) = MapKeyValueStore(map) + fun <K, V> open(map: MutableMap<K, V>) = MapKeyValueStore(map) } - override suspend fun get(key: Bytes): Bytes? = map[key] + override suspend fun get(key: K): V? = map[key] - override suspend fun put(key: Bytes, value: Bytes) { + override suspend fun put(key: K, value: V) { map[key] = value } - override suspend fun keys(): Iterable<Bytes> = map.keys + override suspend fun keys(): Iterable<K> = map.keys /** * Has no effect in this KeyValueStore implementation. diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt index 8e8717c..f22f0cd 100644 --- a/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt +++ b/kv/src/main/kotlin/org/apache/tuweni/kv/RedisKeyValueStore.kt @@ -24,58 +24,131 @@ import io.lettuce.core.codec.RedisCodec import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.future.await import org.apache.tuweni.bytes.Bytes +import org.checkerframework.checker.units.qual.K import java.net.InetAddress import java.util.concurrent.CompletionStage +import java.util.function.Function import kotlin.coroutines.CoroutineContext /** * A key-value store backed by Redis. * * @param uri The uri to the Redis store. + * @param keySerializer the serializer of key objects to bytes + * @param valueSerializer the serializer of value objects to bytes + * @param keyDeserializer the deserializer of keys from bytes + * @param valueDeserializer the deserializer of values from bytes + * @param coroutineContext the co-routine context in which this store executes * @constructor Open a Redis-backed key-value store. */ -class RedisKeyValueStore( +class RedisKeyValueStore<K, V>( uri: String, + private val keySerializer: (K) -> Bytes, + private val valueSerializer: (V) -> Bytes, + private val keyDeserializer: (Bytes) -> K, + private val valueDeserializer: (Bytes) -> V, override val coroutineContext: CoroutineContext = Dispatchers.IO -) : KeyValueStore { +) : KeyValueStore<K, V> { companion object { /** * Open a Redis-backed key-value store. * * @param uri The uri to the Redis store. + * @param keySerializer the serializer of key objects to bytes + * @param valueSerializer the serializer of value objects to bytes + * @param keyDeserializer the deserializer of keys from bytes + * @param valueDeserializer the deserializer of values from bytes * @return A key-value store. */ @JvmStatic - fun open(uri: String) = RedisKeyValueStore(uri) + fun <K, V> open( + uri: String, + keySerializer: Function<K, Bytes>, + valueSerializer: Function<V, Bytes>, + keyDeserializer: Function<Bytes, K>, + valueDeserializer: Function<Bytes, V> + ) = RedisKeyValueStore(uri, + keySerializer::apply, + valueSerializer::apply, + keyDeserializer::apply, + valueDeserializer::apply) /** * Open a Redis-backed key-value store. * * @param port The port for the Redis store. + * @param keySerializer the serializer of key objects to bytes + * @param valueSerializer the serializer of value objects to bytes + * @param keyDeserializer the deserializer of keys from bytes + * @param valueDeserializer the deserializer of values from bytes * @return A key-value store. */ @JvmStatic - fun open(port: Int) = RedisKeyValueStore(port) + fun <K, V> open( + port: Int, + keySerializer: Function<K, Bytes>, + valueSerializer: Function<V, Bytes>, + keyDeserializer: Function<Bytes, K>, + valueDeserializer: Function<Bytes, V> + ) = + RedisKeyValueStore(port = port, + keySerializer = keySerializer::apply, + valueSerializer = valueSerializer::apply, + keyDeserializer = keyDeserializer::apply, + valueDeserializer = valueDeserializer::apply) /** * Open a Redis-backed key-value store. * * @param address The address for the Redis store. + * @param keySerializer the serializer of key objects to bytes + * @param valueSerializer the serializer of value objects to bytes + * @param keyDeserializer the deserializer of keys from bytes + * @param valueDeserializer the deserializer of values from bytes * @return A key-value store. */ @JvmStatic - fun open(address: InetAddress) = RedisKeyValueStore(6379, address) + fun <K, V> open( + address: InetAddress, + keySerializer: Function<K, Bytes>, + valueSerializer: Function<V, Bytes>, + keyDeserializer: Function<Bytes, K>, + valueDeserializer: Function<Bytes, V> + ) = + RedisKeyValueStore(6379, + address, + keySerializer::apply, + valueSerializer::apply, + keyDeserializer::apply, + valueDeserializer::apply) /** * Open a Redis-backed key-value store. * * @param port The port for the Redis store. * @param address The address for the Redis store. + * @param keySerializer the serializer of key objects to bytes + * @param valueSerializer the serializer of value objects to bytes + * @param keyDeserializer the deserializer of keys from bytes + * @param valueDeserializer the deserializer of values from bytes * @return A key-value store. */ @JvmStatic - fun open(port: Int, address: InetAddress) = RedisKeyValueStore(port, address) + fun <K, V> open( + port: Int, + address: InetAddress, + keySerializer: Function<K, Bytes>, + valueSerializer: Function<V, Bytes>, + keyDeserializer: Function<Bytes, K>, + valueDeserializer: Function<Bytes, V> + ) = + RedisKeyValueStore(port, + address, + keySerializer::apply, + valueSerializer::apply, + keyDeserializer::apply, + valueDeserializer::apply) /** * A [RedisCodec] for working with Bytes classes. @@ -94,12 +167,24 @@ class RedisKeyValueStore( * * @param port The port for the Redis store. * @param address The address for the Redis store. + * @param keySerializer the serializer of key objects to bytes + * @param valueSerializer the serializer of value objects to bytes + * @param keyDeserializer the deserializer of keys from bytes + * @param valueDeserializer the deserializer of values from bytes */ @JvmOverloads constructor( port: Int = 6379, - address: InetAddress = InetAddress.getLoopbackAddress() - ) : this(RedisURI.create(address.hostAddress, port).toURI().toString()) + address: InetAddress = InetAddress.getLoopbackAddress(), + keySerializer: (K) -> Bytes, + valueSerializer: (V) -> Bytes, + keyDeserializer: (Bytes) -> K, + valueDeserializer: (Bytes) -> V + ) : this(RedisURI.create(address.hostAddress, port).toURI().toString(), + keySerializer, + valueSerializer, + keyDeserializer, + valueDeserializer) init { val redisClient = RedisClient.create(uri) @@ -107,14 +192,20 @@ class RedisKeyValueStore( asyncCommands = conn.async() } - override suspend fun get(key: Bytes): Bytes? = asyncCommands.get(key).await() + override suspend fun get(key: K): V? = asyncCommands.get(keySerializer(key)).thenApply { + if (it == null) { + null + } else { + valueDeserializer(it) + } + }.await() - override suspend fun put(key: Bytes, value: Bytes) { - val future: CompletionStage<String> = asyncCommands.set(key, value) + override suspend fun put(key: K, value: V) { + val future: CompletionStage<String> = asyncCommands.set(keySerializer(key), valueSerializer(value)) future.await() } - override suspend fun keys(): Iterable<Bytes> = asyncCommands.keys(Bytes.EMPTY).await() + override suspend fun keys(): Iterable<K> = asyncCommands.keys(Bytes.EMPTY).await().map(keyDeserializer) override fun close() { conn.close() diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt index 7911803..30e8dad 100644 --- a/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt +++ b/kv/src/main/kotlin/org/apache/tuweni/kv/RocksDBKeyValueStore.kt @@ -25,6 +25,7 @@ import java.io.IOException import java.nio.file.Files import java.nio.file.Path import java.util.concurrent.atomic.AtomicBoolean +import java.util.function.Function import kotlin.coroutines.CoroutineContext /** @@ -37,37 +38,72 @@ import kotlin.coroutines.CoroutineContext * @throws IOException If an I/O error occurs. * @constructor Open a RocksDB-backed key-value store. */ -class RocksDBKeyValueStore +class RocksDBKeyValueStore<K, V> @Throws(IOException::class) constructor( dbPath: Path, + private val keySerializer: (K) -> Bytes, + private val valueSerializer: (V) -> Bytes, + private val keyDeserializer: (Bytes) -> K, + private val valueDeserializer: (Bytes) -> V, options: Options = Options().setCreateIfMissing(true).setWriteBufferSize(268435456).setMaxOpenFiles(-1), override val coroutineContext: CoroutineContext = Dispatchers.IO -) : KeyValueStore { +) : KeyValueStore<K, V> { companion object { /** * Open a RocksDB-backed key-value store. * * @param dbPath The path to the RocksDB database. + * @param keySerializer the serializer of key objects to bytes + * @param valueSerializer the serializer of value objects to bytes + * @param keyDeserializer the deserializer of keys from bytes + * @param valueDeserializer the deserializer of values from bytes * @return A key-value store. * @throws IOException If an I/O error occurs. */ @JvmStatic @Throws(IOException::class) - fun open(dbPath: Path) = RocksDBKeyValueStore(dbPath) + fun <K, V> open( + dbPath: Path, + keySerializer: Function<K, Bytes>, + valueSerializer: Function<V, Bytes>, + keyDeserializer: Function<Bytes, K>, + valueDeserializer: Function<Bytes, V> + ) = RocksDBKeyValueStore(dbPath, + keySerializer::apply, + valueSerializer::apply, + keyDeserializer::apply, + valueDeserializer::apply) /** * Open a RocksDB-backed key-value store. * * @param dbPath The path to the RocksDB database. + * @param keySerializer the serializer of key objects to bytes + * @param valueSerializer the serializer of value objects to bytes + * @param keyDeserializer the deserializer of keys from bytes + * @param valueDeserializer the deserializer of values from bytes * @param options Options for the RocksDB database. * @return A key-value store. * @throws IOException If an I/O error occurs. */ @JvmStatic @Throws(IOException::class) - fun open(dbPath: Path, options: Options) = RocksDBKeyValueStore(dbPath, options) + fun <K, V> open( + dbPath: Path, + keySerializer: Function<K, Bytes>, + valueSerializer: Function<V, Bytes>, + keyDeserializer: Function<Bytes, K>, + valueDeserializer: Function<Bytes, V>, + options: Options + ) = + RocksDBKeyValueStore(dbPath, + keySerializer::apply, + valueSerializer::apply, + keyDeserializer::apply, + valueDeserializer::apply, + options) } private val db: RocksDB @@ -79,43 +115,43 @@ constructor( db = RocksDB.open(options, dbPath.toAbsolutePath().toString()) } - override suspend fun get(key: Bytes): Bytes? { + override suspend fun get(key: K): V? { if (closed.get()) { throw IllegalStateException("Closed DB") } - val rawValue = db[key.toArrayUnsafe()] + val rawValue = db[keySerializer(key).toArrayUnsafe()] return if (rawValue == null) { null } else { - Bytes.wrap(rawValue) + valueDeserializer(Bytes.wrap(rawValue)) } } - override suspend fun put(key: Bytes, value: Bytes) { + override suspend fun put(key: K, value: V) { if (closed.get()) { throw IllegalStateException("Closed DB") } - db.put(key.toArrayUnsafe(), value.toArrayUnsafe()) + db.put(keySerializer(key).toArrayUnsafe(), valueSerializer(value).toArrayUnsafe()) } - private class BytesIterator(val rIterator: RocksIterator) : Iterator<Bytes> { + private class BytesIterator<K>(val rIterator: RocksIterator, val keyDeserializer: (Bytes) -> K) : Iterator<K> { override fun hasNext(): Boolean = rIterator.isValid - override fun next(): Bytes { - val key = Bytes.wrap(rIterator.key()) + override fun next(): K { + val key = rIterator.key() rIterator.next() - return key + return keyDeserializer(Bytes.wrap(key)) } } - override suspend fun keys(): Iterable<Bytes> { + override suspend fun keys(): Iterable<K> { if (closed.get()) { throw IllegalStateException("Closed DB") } val iter = db.newIterator() iter.seekToFirst() - return Iterable { BytesIterator(iter) } + return Iterable { BytesIterator(iter, keyDeserializer) } } /** diff --git a/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt b/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt index aa07336..46c6e93 100644 --- a/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt +++ b/kv/src/main/kotlin/org/apache/tuweni/kv/SQLKeyValueStore.kt @@ -38,27 +38,45 @@ import kotlin.coroutines.CoroutineContext * @throws IOException If an I/O error occurs. * @constructor Open a relational database backed key-value store. */ -class SQLKeyValueStore +class SQLKeyValueStore<K, V> @Throws(IOException::class) constructor( jdbcurl: String, val tableName: String = "store", val keyColumn: String = "key", val valueColumn: String = "value", + private val keySerializer: (K) -> Bytes, + private val valueSerializer: (V) -> Bytes, + private val keyDeserializer: (Bytes) -> K, + private val valueDeserializer: (Bytes?) -> V?, override val coroutineContext: CoroutineContext = Dispatchers.IO -) : KeyValueStore { +) : KeyValueStore<K, V> { companion object { /** * Open a relational database backed key-value store. * * @param jdbcUrl The JDBC url to connect to the database. + * @param keySerializer the serializer of key objects to bytes + * @param valueSerializer the serializer of value objects to bytes + * @param keyDeserializer the deserializer of keys from bytes + * @param valueDeserializer the deserializer of values from bytes * @return A key-value store. * @throws IOException If an I/O error occurs. */ @JvmStatic @Throws(IOException::class) - fun open(jdbcUrl: String) = SQLKeyValueStore(jdbcUrl) + fun <K, V> open( + jdbcUrl: String, + keySerializer: (K) -> Bytes, + valueSerializer: (V) -> Bytes, + keyDeserializer: (Bytes) -> K, + valueDeserializer: (Bytes?) -> V? + ) = SQLKeyValueStore<K, V>(jdbcurl = jdbcUrl, + keySerializer = keySerializer, + valueSerializer = valueSerializer, + keyDeserializer = keyDeserializer, + valueDeserializer = valueDeserializer) /** * Open a relational database backed key-value store. @@ -67,13 +85,33 @@ constructor( * @param tableName the name of the table to use for storage. * @param keyColumn the key column of the store. * @param valueColumn the value column of the store. + * @param keySerializer the serializer of key objects to bytes + * @param valueSerializer the serializer of value objects to bytes + * @param keyDeserializer the deserializer of keys from bytes + * @param valueDeserializer the deserializer of values from bytes * @return A key-value store. * @throws IOException If an I/O error occurs. */ @JvmStatic @Throws(IOException::class) - fun open(jdbcUrl: String, tableName: String, keyColumn: String, valueColumn: String) = - SQLKeyValueStore(jdbcUrl, tableName, keyColumn, valueColumn) + fun <K, V> open( + jdbcUrl: String, + tableName: String, + keyColumn: String, + valueColumn: String, + keySerializer: (K) -> Bytes, + valueSerializer: (V) -> Bytes, + keyDeserializer: (Bytes) -> K, + valueDeserializer: (Bytes?) -> V? + ) = + SQLKeyValueStore<K, V>(jdbcUrl, + tableName, + keyColumn, + valueColumn, + keySerializer, + valueSerializer, + keyDeserializer, + valueDeserializer) } private val connectionPool: BoneCP @@ -85,34 +123,34 @@ constructor( connectionPool = BoneCP(config) } - override suspend fun get(key: Bytes): Bytes? { + override suspend fun get(key: K): V? { connectionPool.asyncConnection.await().use { val stmt = it.prepareStatement("SELECT $valueColumn FROM $tableName WHERE $keyColumn = ?") - stmt.setBytes(1, key.toArrayUnsafe()) + stmt.setBytes(1, keySerializer(key).toArrayUnsafe()) stmt.execute() val rs = stmt.resultSet return if (rs.next()) { - Bytes.wrap(rs.getBytes(1)) + valueDeserializer(Bytes.wrap(rs.getBytes(1))) } else { null } } } - override suspend fun put(key: Bytes, value: Bytes) { + override suspend fun put(key: K, value: V) { connectionPool.asyncConnection.await().use { val stmt = it.prepareStatement("INSERT INTO $tableName($keyColumn, $valueColumn) VALUES(?,?)") - stmt.setBytes(1, key.toArrayUnsafe()) - stmt.setBytes(2, value.toArrayUnsafe()) + stmt.setBytes(1, keySerializer(key).toArrayUnsafe()) + stmt.setBytes(2, valueSerializer(value).toArrayUnsafe()) it.autoCommit = false try { stmt.execute() } catch (e: SQLException) { val updateStmt = it.prepareStatement("UPDATE $tableName SET $valueColumn=? WHERE $keyColumn=?") - updateStmt.setBytes(1, value.toArrayUnsafe()) - updateStmt.setBytes(2, key.toArrayUnsafe()) + updateStmt.setBytes(1, valueSerializer(value).toArrayUnsafe()) + updateStmt.setBytes(2, keySerializer(key).toArrayUnsafe()) updateStmt.execute() } it.commit() @@ -120,24 +158,24 @@ constructor( } } - private class SQLIterator(val resultSet: ResultSet) : Iterator<Bytes> { + private class SQLIterator<K>(val resultSet: ResultSet, val keyDeserializer: (Bytes) -> K) : Iterator<K> { private var next = resultSet.next() override fun hasNext(): Boolean = next - override fun next(): Bytes { - val key = Bytes.wrap(resultSet.getBytes(1)) + override fun next(): K { + val key = keyDeserializer(Bytes.wrap(resultSet.getBytes(1))) next = resultSet.next() return key } } - override suspend fun keys(): Iterable<Bytes> { + override suspend fun keys(): Iterable<K> { connectionPool.asyncConnection.await().use { val stmt = it.prepareStatement("SELECT $keyColumn FROM $tableName") stmt.execute() - return Iterable { SQLIterator(stmt.resultSet) } + return Iterable { SQLIterator(stmt.resultSet, keyDeserializer) } } } diff --git a/kv/src/test/java/org/apache/tuweni/kv/KeyValueStoreTest.java b/kv/src/test/java/org/apache/tuweni/kv/KeyValueStoreTest.java index b5d8c2a..19e0f37 100644 --- a/kv/src/test/java/org/apache/tuweni/kv/KeyValueStoreTest.java +++ b/kv/src/test/java/org/apache/tuweni/kv/KeyValueStoreTest.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.function.Function; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -34,10 +35,12 @@ import org.junit.jupiter.api.extension.ExtendWith; @ExtendWith(TempDirectoryExtension.class) class KeyValueStoreTest { + private static final Function<Bytes, Bytes> bytesIdentityFn = Function.identity(); + @Test void testPutAndGet() throws Exception { Map<Bytes, Bytes> map = new HashMap<>(); - KeyValueStore store = MapKeyValueStore.open(map); + KeyValueStore<Bytes, Bytes> store = MapKeyValueStore.open(map); AsyncCompletion completion = store.putAsync(Bytes.of(123), Bytes.of(10, 12, 13)); completion.join(); Bytes value = store.getAsync(Bytes.of(123)).get(); @@ -49,14 +52,14 @@ class KeyValueStoreTest { @Test void testNoValue() throws Exception { Map<Bytes, Bytes> map = new HashMap<>(); - KeyValueStore store = MapKeyValueStore.open(map); + KeyValueStore<Bytes, Bytes> store = MapKeyValueStore.open(map); assertNull(store.getAsync(Bytes.of(123)).get()); } @Test void testKeys() throws Exception { Map<Bytes, Bytes> map = new HashMap<>(); - KeyValueStore store = MapKeyValueStore.open(map); + KeyValueStore<Bytes, Bytes> store = MapKeyValueStore.open(map); AsyncCompletion completion = store.putAsync(Bytes.of(123), Bytes.of(10, 12, 13)); completion.join(); Set<Bytes> keys = new HashSet<>(); @@ -66,7 +69,12 @@ class KeyValueStoreTest { @Test void testLevelDBWithoutOptions(@TempDirectory Path tempDirectory) throws Exception { - try (LevelDBKeyValueStore leveldb = LevelDBKeyValueStore.open(tempDirectory.resolve("foo").resolve("bar"))) { + try (LevelDBKeyValueStore<Bytes, Bytes> leveldb = LevelDBKeyValueStore.open( + tempDirectory.resolve("foo").resolve("bar"), + bytesIdentityFn, + bytesIdentityFn, + bytesIdentityFn, + bytesIdentityFn)) { AsyncCompletion completion = leveldb.putAsync(Bytes.of(123), Bytes.of(10, 12, 13)); completion.join(); Bytes value = leveldb.getAsync(Bytes.of(123)).get(); @@ -77,7 +85,12 @@ class KeyValueStoreTest { @Test void testRocksDBWithoutOptions(@TempDirectory Path tempDirectory) throws Exception { - try (RocksDBKeyValueStore rocksdb = RocksDBKeyValueStore.open(tempDirectory.resolve("foo").resolve("bar"))) { + try (RocksDBKeyValueStore<Bytes, Bytes> rocksdb = RocksDBKeyValueStore.open( + tempDirectory.resolve("foo").resolve("bar"), + bytesIdentityFn, + bytesIdentityFn, + bytesIdentityFn, + bytesIdentityFn)) { AsyncCompletion completion = rocksdb.putAsync(Bytes.of(123), Bytes.of(10, 12, 13)); completion.join(); Bytes value = rocksdb.getAsync(Bytes.of(123)).get(); diff --git a/kv/src/test/java/org/apache/tuweni/kv/RedisKeyValueStoreTest.java b/kv/src/test/java/org/apache/tuweni/kv/RedisKeyValueStoreTest.java index c218a65..db44ae1 100644 --- a/kv/src/test/java/org/apache/tuweni/kv/RedisKeyValueStoreTest.java +++ b/kv/src/test/java/org/apache/tuweni/kv/RedisKeyValueStoreTest.java @@ -22,6 +22,7 @@ import org.apache.tuweni.junit.RedisPort; import org.apache.tuweni.junit.RedisServerExtension; import java.net.InetAddress; +import java.util.function.Function; import io.lettuce.core.RedisClient; import io.lettuce.core.RedisURI; @@ -34,7 +35,8 @@ class RedisKeyValueStoreTest { @Test void testPutAndGet(@RedisPort Integer redisPort) throws Exception { - KeyValueStore store = RedisKeyValueStore.open(redisPort); + KeyValueStore<Bytes, Bytes> store = RedisKeyValueStore + .open(redisPort, Function.identity(), Function.identity(), Function.identity(), Function.identity()); AsyncCompletion completion = store.putAsync(Bytes.of(123), Bytes.of(10, 12, 13)); completion.join(); Bytes value = store.getAsync(Bytes.of(123)).get(); @@ -49,13 +51,24 @@ class RedisKeyValueStoreTest { @Test void testNoValue(@RedisPort Integer redisPort) throws Exception { - KeyValueStore store = RedisKeyValueStore.open(redisPort, InetAddress.getLoopbackAddress()); + KeyValueStore<Bytes, Bytes> store = RedisKeyValueStore.open( + redisPort, + InetAddress.getLoopbackAddress(), + Function.identity(), + Function.identity(), + Function.identity(), + Function.identity()); assertNull(store.getAsync(Bytes.of(124)).get()); } @Test void testRedisCloseable(@RedisPort Integer redisPort) throws Exception { - try (RedisKeyValueStore redis = RedisKeyValueStore.open("redis://127.0.0.1:" + redisPort)) { + try (RedisKeyValueStore<Bytes, Bytes> redis = RedisKeyValueStore.open( + "redis://127.0.0.1:" + redisPort, + Function.identity(), + Function.identity(), + Function.identity(), + Function.identity())) { AsyncCompletion completion = redis.putAsync(Bytes.of(125), Bytes.of(10, 12, 13)); completion.join(); Bytes value = redis.getAsync(Bytes.of(125)).get(); diff --git a/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt b/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt index ae8c62b..94f4ffb 100644 --- a/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt +++ b/kv/src/test/kotlin/org/apache/tuweni/kv/KeyValueStoreSpec.kt @@ -110,7 +110,7 @@ object InfinispanKeyValueStoreSpec : Spek({ runBlocking { kv.put(bar, foo) kv.put(foo, bar) - val keys = (kv.keys().map { it }) + val keys = (kv.keys().map { it }) keys.should.contain(bar) keys.should.contain(foo) } @@ -120,7 +120,12 @@ object InfinispanKeyValueStoreSpec : Spek({ object MapDBKeyValueStoreSpec : Spek({ val testDir = Files.createTempDirectory("data") - val kv = MapDBKeyValueStore(testDir.resolve("data.db")) + val kv = MapDBKeyValueStore( + testDir.resolve("data.db"), + keySerializer = { it }, + valueSerializer = { it }, + keyDeserializer = { it }, + valueDeserializer = { it }) describe("a MapDB-backed key value store") { @@ -141,12 +146,19 @@ object MapDBKeyValueStoreSpec : Spek({ runBlocking { kv.put(foobar, foo) kv.put(foo, bar) - kv.keys().should.equal(setOf(foobar, foo)) + val keys = kv.keys().map { it } + keys.should.contain(foo) + keys.should.contain(foobar) } } it("should not allow usage after the DB is closed") { - val kv2 = MapDBKeyValueStore(testDir.resolve("data2.db")) + val kv2 = MapDBKeyValueStore( + testDir.resolve("data2.db"), + keySerializer = { it }, + valueSerializer = { it }, + keyDeserializer = { it }, + valueDeserializer = { it }) kv2.close() runBlocking { var caught = false @@ -168,7 +180,12 @@ object MapDBKeyValueStoreSpec : Spek({ object LevelDBKeyValueStoreSpec : Spek({ val path = Files.createTempDirectory("leveldb") - val kv = LevelDBKeyValueStore(path) + val kv = LevelDBKeyValueStore( + path, + keySerializer = { it }, + valueSerializer = { it }, + keyDeserializer = { it }, + valueDeserializer = { it }) afterGroup { kv.close() MoreFiles.deleteRecursively(path, RecursiveDeleteOption.ALLOW_INSECURE) @@ -197,7 +214,12 @@ object LevelDBKeyValueStoreSpec : Spek({ } it("should not allow usage after the DB is closed") { - val kv2 = LevelDBKeyValueStore(path.resolve("subdb")) + val kv2 = LevelDBKeyValueStore( + path.resolve("subdb"), + keySerializer = { it }, + valueSerializer = { it }, + keyDeserializer = { it }, + valueDeserializer = { it }) kv2.close() runBlocking { var caught = false @@ -214,7 +236,12 @@ object LevelDBKeyValueStoreSpec : Spek({ object RocksDBKeyValueStoreSpec : Spek({ val path = Files.createTempDirectory("rocksdb") - val kv = RocksDBKeyValueStore(path) + val kv = RocksDBKeyValueStore( + path, + keySerializer = { it }, + valueSerializer = { it }, + keyDeserializer = { it }, + valueDeserializer = { it }) afterGroup { kv.close() MoreFiles.deleteRecursively(path, RecursiveDeleteOption.ALLOW_INSECURE) @@ -245,7 +272,12 @@ object RocksDBKeyValueStoreSpec : Spek({ } it("should not allow usage after the DB is closed") { - val kv2 = RocksDBKeyValueStore(path.resolve("subdb")) + val kv2 = RocksDBKeyValueStore( + path.resolve("subdb"), + keySerializer = { it }, + valueSerializer = { it }, + keyDeserializer = { it }, + valueDeserializer = { it }) kv2.close() runBlocking { var caught = false @@ -269,8 +301,21 @@ object SQLKeyValueStoreSpec : Spek({ st.executeUpdate("create table store(key binary, value binary, primary key(key))") st.executeUpdate("create table store2(id binary, val binary, primary key(id))") } - val kv = SQLKeyValueStore(jdbcUrl) - val otherkv = SQLKeyValueStore.open(jdbcUrl, "store2", "id", "val") + val kv = SQLKeyValueStore( + jdbcUrl, + keySerializer = { it }, + valueSerializer = { it }, + keyDeserializer = { it }, + valueDeserializer = { it }) + val otherkv = SQLKeyValueStore.open( + jdbcUrl, + "store2", + "id", + "val", + keySerializer = { it }, + valueSerializer = { it }, + keyDeserializer = { it }, + valueDeserializer = { it }) afterGroup { kv.close() otherkv.close() @@ -316,7 +361,12 @@ object SQLKeyValueStoreSpec : Spek({ } it("should not allow usage after the DB is closed") { - val kv2 = SQLKeyValueStore("jdbc:h2:mem:testdb") + val kv2 = SQLKeyValueStore( + "jdbc:h2:mem:testdb", + keySerializer = { it }, + valueSerializer = { it }, + keyDeserializer = { it }, + valueDeserializer = { it }) kv2.close() runBlocking { var caught = false --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
