Repository: incubator-samza Updated Branches: refs/heads/master a66a66c53 -> 9147c343b
SAMZA-236; add a rocksdb state implementation Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/9147c343 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/9147c343 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/9147c343 Branch: refs/heads/master Commit: 9147c343be9a85d49630bd7522c79e825f5682f2 Parents: a66a66c Author: Naveen Somasundaram <[email protected]> Authored: Thu Oct 16 08:48:12 2014 -0700 Committer: Chris Riccomini <[email protected]> Committed: Thu Oct 16 08:48:12 2014 -0700 ---------------------------------------------------------------------- build.gradle | 18 +- gradle/dependency-versions.gradle | 1 + .../samza/util/LexicographicComparator.scala | 39 ++++ .../samza/storage/kv/LevelDbKeyValueStore.scala | 17 +- .../RocksDbKeyValueStorageEngineFactory.scala | 50 +++++ .../samza/storage/kv/RocksDbKeyValueStore.scala | 224 +++++++++++++++++++ .../performance/TestKeyValuePerformance.scala | 2 - .../samza/storage/kv/TestKeyValueStores.scala | 85 ++++--- settings.gradle | 1 + 9 files changed, 386 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9147c343/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 1b37dbb..99f4354 100644 --- a/build.gradle +++ b/build.gradle @@ -33,7 +33,8 @@ allprojects { } mavenCentral() mavenLocal() - } + } + } apply from: file("gradle/dependency-versions.gradle") @@ -345,6 +346,20 @@ project(":samza-kv-leveldb_$scalaVersion") { } } +project(":samza-kv-rocksdb_$scalaVersion") { + apply plugin: 'scala' + + dependencies { + compile project(':samza-api') + compile project(":samza-core_$scalaVersion") + compile project(":samza-kv_$scalaVersion") + compile "org.scala-lang:scala-library:$scalaLibVersion" + compile "org.rocksdb:rocksdbjni:$rocksdbVersion" + testCompile "junit:junit:$junitVersion" + testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion" + } +} + project(":samza-test_$scalaVersion") { apply plugin: 'scala' @@ -359,6 +374,7 @@ project(":samza-test_$scalaVersion") { compile project(':samza-api') compile project(":samza-kv-inmemory_$scalaVersion") compile project(":samza-kv-leveldb_$scalaVersion") + compile project(":samza-kv-rocksdb_$scalaVersion") compile project(":samza-core_$scalaVersion") compile "org.scala-lang:scala-library:$scalaLibVersion" compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion" http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9147c343/gradle/dependency-versions.gradle ---------------------------------------------------------------------- diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle index fe2e446..44dd426 100644 --- a/gradle/dependency-versions.gradle +++ b/gradle/dependency-versions.gradle @@ -29,6 +29,7 @@ kafkaVersion = "0.8.1.1" commonsHttpClientVersion = "3.1" leveldbVersion = "1.8" + rocksdbVersion = "3.5.1" yarnVersion = "2.4.0" slf4jVersion = "1.6.2" log4jVersion = "1.2.17" http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9147c343/samza-core/src/main/scala/org/apache/samza/util/LexicographicComparator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/LexicographicComparator.scala b/samza-core/src/main/scala/org/apache/samza/util/LexicographicComparator.scala new file mode 100644 index 0000000..93c5220 --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/util/LexicographicComparator.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.util; + +import java.util.Comparator + +/** + * A comparator that applies a lexicographical comparison on byte arrays. + */ +class LexicographicComparator extends Comparator[Array[Byte]] { + def compare(k1: Array[Byte], k2: Array[Byte]): Int = { + val l = math.min(k1.length, k2.length) + var i = 0 + while (i < l) { + if (k1(i) != k2(i)) + return (k1(i) & 0xff) - (k2(i) & 0xff) + i += 1 + } + // okay prefixes are equal, the shorter array is less + k1.length - k2.length + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9147c343/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala b/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala index 853de12..f4a021a 100644 --- a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala +++ b/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala @@ -24,7 +24,7 @@ import org.fusesource.leveldbjni.JniDBFactory._ import java.io._ import org.apache.samza.config.Config import org.apache.samza.container.SamzaContainerContext -import org.apache.samza.util.Logging +import org.apache.samza.util.{LexicographicComparator, Logging} object LevelDbKeyValueStore extends Logging { @@ -64,7 +64,7 @@ class LevelDbKeyValueStore( val metrics: KeyValueStoreMetrics = new KeyValueStoreMetrics) extends KeyValueStore[Array[Byte], Array[Byte]] with Logging { private lazy val db = factory.open(dir, options) - private val lexicographic = new LexicographicComparator() + private val lexicographic = new LevelDBLexicographicComparator() private var deletesSinceLastCompaction = 0 def get(key: Array[Byte]): Array[Byte] = { @@ -209,18 +209,7 @@ class LevelDbKeyValueStore( /** * Compare two array lexicographically using unsigned byte arithmetic */ - class LexicographicComparator extends DBComparator { - def compare(k1: Array[Byte], k2: Array[Byte]): Int = { - val l = math.min(k1.length, k2.length) - var i = 0 - while (i < l) { - if (k1(i) != k2(i)) - return (k1(i) & 0xff) - (k2(i) & 0xff) - i += 1 - } - // okay prefixes are equal, the shorter array is less - k1.length - k2.length - } + class LevelDBLexicographicComparator extends LexicographicComparator with DBComparator { def name(): String = "lexicographic" def findShortestSeparator(start: Array[Byte], limit: Array[Byte]) = start def findShortSuccessor(key: Array[Byte]) = key http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9147c343/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala new file mode 100644 index 0000000..a52731b --- /dev/null +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage + +import java.io.File +import org.apache.samza.container.SamzaContainerContext +import org.apache.samza.metrics.MetricsRegistry +import org.apache.samza.storage.kv._ +import org.apache.samza.system.SystemStreamPartition + +class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngineFactory[K, V] +{ + /** + * Return a KeyValueStore instance for the given store name + * @param storeName Name of the store + * @param storeDir The directory of the store + * @param registry MetricsRegistry to which to publish store specific metrics. + * @param changeLogSystemStreamPartition Samza stream partition from which to receive the changelog. + * @param containerContext Information about the container in which the task is executing. + * @return A valid KeyValueStore instance + */ + override def getKVStore(storeName: String, + storeDir: File, + registry: MetricsRegistry, + changeLogSystemStreamPartition: SystemStreamPartition, + containerContext: SamzaContainerContext): KeyValueStore[Array[Byte], Array[Byte]] = { + val storageConfig = containerContext.config.subset("stores." + storeName + ".", true) + val rocksDbMetrics = new KeyValueStoreMetrics(storeName, registry) + val rocksDbOptions = RocksDbKeyValueStore.options(storageConfig, containerContext) + val rocksDb = new RocksDbKeyValueStore(storeDir, rocksDbOptions, rocksDbMetrics) + rocksDb + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9147c343/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala new file mode 100644 index 0000000..4b23c9b --- /dev/null +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.kv + +import java.io.File +import org.apache.samza.util.{ LexicographicComparator, Logging } +import org.apache.samza.config.Config +import org.apache.samza.container.SamzaContainerContext +import org.rocksdb._ + +object RocksDbKeyValueStore extends Logging { + def options(storeConfig: Config, containerContext: SamzaContainerContext) = { + val cacheSize = storeConfig.getLong("container.cache.size.bytes", 100 * 1024 * 1024L) + val writeBufSize = storeConfig.getLong("container.write.buffer.size.bytes", 32 * 1024 * 1024) + val options = new Options(); + + // Cache size and write buffer size are specified on a per-container basis. + val numTasks = containerContext.taskNames.size + options.setWriteBufferSize((writeBufSize / numTasks).toInt) + var cacheSizePerContainer = cacheSize / numTasks + options.setCompressionType( + storeConfig.get("rocksdb.compression", "snappy") match { + case "snappy" => CompressionType.SNAPPY_COMPRESSION + case "bzip2" => CompressionType.BZLIB2_COMPRESSION + case "zlib" => CompressionType.ZLIB_COMPRESSION + case "lz4" => CompressionType.LZ4_COMPRESSION + case "lz4hc" => CompressionType.LZ4HC_COMPRESSION + case "none" => CompressionType.NO_COMPRESSION + case _ => + warn("Unknown rocksdb.compression codec %s, defaulting to Snappy" format storeConfig.get("rocksdb.compression", "snappy")) + CompressionType.SNAPPY_COMPRESSION + }) + + val blockSize = storeConfig.getInt("rocksdb.block.size.bytes", 4096) + // We compute the cache size based on the overall container cache size, + // however, if rocksdb.cache.size.bytes is overridden, then we use that. + cacheSizePerContainer = storeConfig.getLong("rocksdb.cache.size.bytes", cacheSizePerContainer) + val bloomBits = storeConfig.getInt("rocksdb.bloomfilter.bits", 10) + val table_options = new BlockBasedTableConfig() + table_options.setBlockCacheSize(cacheSizePerContainer) + .setBlockSize(blockSize) + .setFilterBitsPerKey(bloomBits) + + options.setTableFormatConfig(table_options) + options.setCompactionStyle( + storeConfig.get("rocksdb.compaction.style", "universal") match { + case "universal" => CompactionStyle.UNIVERSAL + case "fifo" => CompactionStyle.FIFO + case "level" => CompactionStyle.LEVEL + case _ => + warn("Unknown rocksdb.compactionStyle %s, defaulting to universal" format storeConfig.get("rocksdb.compaction.style", "universal")) + CompactionStyle.UNIVERSAL + }) + + options.setMaxWriteBufferNumber(storeConfig.get("rocksdb.num.write.buffers", "3").toInt) + options.setCreateIfMissing(true) + options.setErrorIfExists(true) + options + } + +} + +class RocksDbKeyValueStore( + val dir: File, + val options: Options, + val metrics: KeyValueStoreMetrics = new KeyValueStoreMetrics) extends KeyValueStore[Array[Byte], Array[Byte]] with Logging { + + private lazy val db = RocksDB.open(options, dir.toString) + private val lexicographic = new LexicographicComparator() + private var deletesSinceLastCompaction = 0 + + def get(key: Array[Byte]): Array[Byte] = { + metrics.gets.inc + require(key != null, "Null key not allowed.") + val found = db.get(key) + if (found != null) { + metrics.bytesRead.inc(found.size) + } + found + } + + def put(key: Array[Byte], value: Array[Byte]) { + metrics.puts.inc + require(key != null, "Null key not allowed.") + if (value == null) { + db.remove(key) + deletesSinceLastCompaction += 1 + } else { + metrics.bytesWritten.inc(key.size + value.size) + db.put(key, value) + } + } + + // Write batch from RocksDB API is not used currently because of: https://github.com/facebook/rocksdb/issues/262 + def putAll(entries: java.util.List[Entry[Array[Byte], Array[Byte]]]) { + val iter = entries.iterator + var wrote = 0 + var deletes = 0 + while (iter.hasNext) { + wrote += 1 + val curr = iter.next() + if (curr.getValue == null) { + deletes += 1 + db.remove(curr.getKey); + } else { + val key = curr.getKey + val value = curr.getValue + metrics.bytesWritten.inc(key.size + value.size) + db.put(key, value) + } + } + metrics.puts.inc(wrote) + metrics.deletes.inc(deletes) + deletesSinceLastCompaction += deletes + } + + def delete(key: Array[Byte]) { + metrics.deletes.inc + put(key, null) + } + + def range(from: Array[Byte], to: Array[Byte]): KeyValueIterator[Array[Byte], Array[Byte]] = { + metrics.ranges.inc + require(from != null && to != null, "Null bound not allowed.") + new RocksDbRangeIterator(db.newIterator(), from, to) + } + + def all(): KeyValueIterator[Array[Byte], Array[Byte]] = { + metrics.alls.inc + val iter = db.newIterator() + iter.seekToFirst() + new RocksDbIterator(iter) + } + + def flush { + metrics.flushes.inc + // TODO still not exposed in Java RocksDB API, follow up with rocksDB team + trace("Flush in RocksDbKeyValueStore is not supported, ignoring") + } + + def close() { + trace("Closing.") + db.close() + } + + class RocksDbIterator(iter: RocksIterator) extends KeyValueIterator[Array[Byte], Array[Byte]] { + private var open = true + private var firstValueAccessed = false; + def close() = { + open = false + iter.dispose() + } + + def remove() = throw new UnsupportedOperationException("RocksDB iterator doesn't support remove"); + + def hasNext() = iter.isValid + + // The iterator is already pointing to the next element + protected def peekKey() = { + getEntry().getKey + } + + protected def getEntry() = { + val key = iter.key + val value = iter.value + new Entry(key, value) + } + + // By virtue of how RocksdbIterator is implemented, the implementation of + // our iterator is slightly different from standard java iterator next will + // always point to the current element, when next is called, we return the + // current element we are pointing to and advance the iterator to the next + // location (The new location may or may not be valid - this will surface + // when the next next() call is made, the isValid will fail) + def next() = { + if (!hasNext()) { + throw new NoSuchElementException + } + + val entry = getEntry() + iter.next + metrics.bytesRead.inc(entry.getKey.size) + if (entry.getValue != null) { + metrics.bytesRead.inc(entry.getValue.size) + } + entry + } + + override def finalize() { + if (open) { + trace("Leaked reference to level db iterator, forcing close.") + close() + } + } + } + + class RocksDbRangeIterator(iter: RocksIterator, from: Array[Byte], to: Array[Byte]) extends RocksDbIterator(iter) { + // RocksDB's JNI interface does not expose getters/setters that allow the + // comparator to be pluggable, and the default is lexicographic, so it's + // safe to just force lexicographic comparator here for now. + val comparator = lexicographic + iter.seek(from) + override def hasNext() = { + super.hasNext() && comparator.compare(peekKey(), to) < 0 + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9147c343/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala index 8fd33f1..68e9e66 100644 --- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala +++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala @@ -126,9 +126,7 @@ object TestKeyValuePerformance extends Logging { } test.testAllWithDeletes(db, numLoops, messagesPerBatch, messageSizeBytes) - info("Cleaning up output directory for %s." format storeName) - Util.rm(output) } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9147c343/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala index eefe114..2082610 100644 --- a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala +++ b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala @@ -55,36 +55,38 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) { @Before def setup() { - val kvStore : KeyValueStore[Array[Byte], Array[Byte]] = if ("leveldb".equals(typeOfStore)) { - dir.mkdirs() - val leveldb = new LevelDbKeyValueStore(dir, new Options) - leveldb - } else if ("inmemory".equals(typeOfStore)) { - val inmemoryDb = new InMemoryKeyValueStore - inmemoryDb - } else { - throw new IllegalArgumentException("Type of store undefined: " + typeOfStore) + val kvStore : KeyValueStore[Array[Byte], Array[Byte]] = typeOfStore match { + case "leveldb" => + dir.mkdirs () + new LevelDbKeyValueStore (dir, new Options) + case "inmemory" => + new InMemoryKeyValueStore + case "rocksdb" => + new RocksDbKeyValueStore (dir, new org.rocksdb.Options().setCreateIfMissing(true).setCompressionType(org.rocksdb.CompressionType.SNAPPY_COMPRESSION)) + case _ => + throw new IllegalArgumentException("Type of store undefined: " + typeOfStore) } val passThroughSerde = new Serde[Array[Byte]] { def toBytes(obj: Array[Byte]) = obj def fromBytes(bytes: Array[Byte]) = bytes } - store = if ("cache".equals(storeConfig)) { - cache = true - new CachedStore(kvStore, CacheSize, BatchSize) - } else if ("serde".equals(storeConfig)) { - serde = true - new SerializedKeyValueStore(kvStore, passThroughSerde, passThroughSerde) - } else if ("cache-and-serde".equals(storeConfig)) { - val serializedStore = new SerializedKeyValueStore(kvStore, passThroughSerde, passThroughSerde) - serde = true - cache = true - new CachedStore(serializedStore, CacheSize, BatchSize) - } else { - kvStore - } + store = storeConfig match { + case "cache" => + cache = true + new CachedStore(kvStore, CacheSize, BatchSize) + case "serde" => + serde = true + new SerializedKeyValueStore(kvStore, passThroughSerde, passThroughSerde) + case "cache-and-serde" => + val serializedStore = new SerializedKeyValueStore(kvStore, passThroughSerde, passThroughSerde) + serde = true + cache = true + new CachedStore(serializedStore, CacheSize, BatchSize) + case _ => + kvStore + } store = new NullSafeKeyValueStore(store) } @@ -215,14 +217,14 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) { def testBrokenScalaDoubleLinkedList() { val something = b("") val keys = letters - .map(b(_)) - .toArray + .map(b(_)) + .toArray // Load the cache to capacity. letters - .slice(0, TestKeyValueStores.CacheSize) - .map(b(_)) - .foreach(store.put(_, something)) + .slice(0, TestKeyValueStores.CacheSize) + .map(b(_)) + .foreach(store.put(_, something)) // Now keep everything in the cache, but with an empty dirty list. store.flush @@ -247,9 +249,9 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) { // Get rid of 1 from the cache by reading every other element, and then // putting one new element. letters - .slice(2, TestKeyValueStores.CacheSize) - .map(b(_)) - .foreach(store.get(_)) + .slice(2, TestKeyValueStores.CacheSize) + .map(b(_)) + .foreach(store.get(_)) store.put(keys(TestKeyValueStores.CacheSize), something) // Now try and trigger an NPE since the dirty list has an element (1) @@ -266,8 +268,8 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) { // Make test deterministic by seeding the random number generator. val rand = new Random(12345) val keys = letters - .map(b(_)) - .toArray + .map(b(_)) + .toArray // Map from letter to key byte array used for letter, and expected value. // We have to go through some acrobatics here since Java's byte array uses @@ -340,5 +342,20 @@ object TestKeyValueStores { val CacheSize = 10 val BatchSize = 5 @Parameters - def parameters: java.util.Collection[Array[String]] = Arrays.asList(Array("leveldb", "cache"), Array("leveldb", "serde"), Array("leveldb", "cache-and-serde"), Array("leveldb", "none"), Array("inmemory", "cache"), Array("inmemory", "serde"), Array("inmemory", "cache-and-serde"), Array("inmemory", "none")) + def parameters: java.util.Collection[Array[String]] = Arrays.asList( + //LevelDB + Array("leveldb", "cache"), + Array("leveldb", "serde"), + Array("leveldb", "cache-and-serde"), + Array("leveldb", "none"), + //Inmemory + Array("inmemory", "cache"), + Array("inmemory", "serde"), + Array("inmemory", "cache-and-serde"), + Array("inmemory", "none"), + //RocksDB + Array("rocksdb","cache"), + Array("rocksdb","serde"), + Array("rocksdb","cache-and-serde"), + Array("rocksdb","none")) } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9147c343/settings.gradle ---------------------------------------------------------------------- diff --git a/settings.gradle b/settings.gradle index 325cac2..216c5ee 100644 --- a/settings.gradle +++ b/settings.gradle @@ -23,6 +23,7 @@ include \ 'samza-kv', 'samza-kv-inmemory', 'samza-kv-leveldb', + 'samza-kv-rocksdb', 'samza-log4j', 'samza-serializers', 'samza-shell',
