Repository: samza Updated Branches: refs/heads/master 137d0d67e -> a6ada1f6f
SAMZA-543; disable WAL for rocksdb kv store Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a6ada1f6 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a6ada1f6 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a6ada1f6 Branch: refs/heads/master Commit: a6ada1f6fa4fa401201ad863eda3264c1782e276 Parents: 137d0d6 Author: Navina Ramesh <[email protected]> Authored: Wed Mar 18 18:47:08 2015 -0700 Committer: Chris Riccomini <[email protected]> Committed: Wed Mar 18 18:47:08 2015 -0700 ---------------------------------------------------------------------- .../RocksDbKeyValueStorageEngineFactory.scala | 4 +- .../samza/storage/kv/RocksDbKeyValueStore.scala | 11 +- .../src/main/config/perf/kv-perf.properties | 28 ++- .../performance/TestKeyValuePerformance.scala | 175 ++++++++++++------- 4 files changed, 142 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/a6ada1f6/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala index 14eeba5..5ab6859 100644 --- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala @@ -24,6 +24,7 @@ import org.apache.samza.container.SamzaContainerContext import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.storage.kv._ import org.apache.samza.system.SystemStreamPartition +import org.rocksdb.WriteOptions class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngineFactory[K, V] { @@ -44,7 +45,8 @@ class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngi val storageConfig = containerContext.config.subset("stores." + storeName + ".", true) val rocksDbMetrics = new KeyValueStoreMetrics(storeName, registry) val rocksDbOptions = RocksDbKeyValueStore.options(storageConfig, containerContext) - val rocksDb = new RocksDbKeyValueStore(storeDir, rocksDbOptions, rocksDbMetrics) + val rocksDbWriteOptions = new WriteOptions().setDisableWAL(true) + val rocksDb = new RocksDbKeyValueStore(storeDir, rocksDbOptions, rocksDbWriteOptions, rocksDbMetrics) rocksDb } } http://git-wip-us.apache.org/repos/asf/samza/blob/a6ada1f6/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala index eae2a5a..66c2a0d 100644 --- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala @@ -29,7 +29,7 @@ object RocksDbKeyValueStore extends Logging { def options(storeConfig: Config, containerContext: SamzaContainerContext) = { val cacheSize = storeConfig.getLong("container.cache.size.bytes", 100 * 1024 * 1024L) val writeBufSize = storeConfig.getLong("container.write.buffer.size.bytes", 32 * 1024 * 1024) - val options = new Options(); + val options = new Options() // Cache size and write buffer size are specified on a per-container basis. val numTasks = containerContext.taskNames.size @@ -77,6 +77,7 @@ object RocksDbKeyValueStore extends Logging { class RocksDbKeyValueStore( val dir: File, val options: Options, + val writeOptions: WriteOptions = new WriteOptions(), val metrics: KeyValueStoreMetrics = new KeyValueStoreMetrics) extends KeyValueStore[Array[Byte], Array[Byte]] with Logging { private lazy val db = RocksDB.open(options, dir.toString) @@ -97,11 +98,11 @@ class RocksDbKeyValueStore( metrics.puts.inc require(key != null, "Null key not allowed.") if (value == null) { - db.remove(key) + db.remove(writeOptions, key) deletesSinceLastCompaction += 1 } else { metrics.bytesWritten.inc(key.size + value.size) - db.put(key, value) + db.put(writeOptions, key, value) } } @@ -115,12 +116,12 @@ class RocksDbKeyValueStore( val curr = iter.next() if (curr.getValue == null) { deletes += 1 - db.remove(curr.getKey); + db.remove(writeOptions, curr.getKey) } else { val key = curr.getKey val value = curr.getValue metrics.bytesWritten.inc(key.size + value.size) - db.put(key, value) + db.put(writeOptions, key, value) } } metrics.puts.inc(wrote) http://git-wip-us.apache.org/repos/asf/samza/blob/a6ada1f6/samza-test/src/main/config/perf/kv-perf.properties ---------------------------------------------------------------------- diff --git a/samza-test/src/main/config/perf/kv-perf.properties b/samza-test/src/main/config/perf/kv-perf.properties index 0d487b1..33fcd8d 100644 --- a/samza-test/src/main/config/perf/kv-perf.properties +++ b/samza-test/src/main/config/perf/kv-perf.properties @@ -15,7 +15,27 @@ # specific language governing permissions and limitations # under the License. -stores.test.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory -stores.test.compaction.delete.threshold=1000 -test.partition.count=4 -test.num.loops=1000 +# Config for all-with-deletes +test.all-with-deletes.stores.test-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory + +test.all-with-deletes.partition.count=4 + +test.all-with-deletes.set.count=1 +test.all-with-deletes.set-1.num.loops=1000 + +# Config for rocksdb-write-performance +test.rocksdb-write-performance.stores.test-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory +test.rocksdb-write-performance.partition.count=4 + +test.rocksdb-write-performance.set.count=3 +test.rocksdb-write-performance.set-1.message.size=256 +test.rocksdb-write-performance.set-1.message.count=1000000 + +test.rocksdb-write-performance.set-2.message.size=512 +test.rocksdb-write-performance.set-2.message.count=1000000 + +test.rocksdb-write-performance.set-3.message.size=1024 +test.rocksdb-write-performance.set-3.message.count=1000000 + +# List of tests to execute +test.methods=rocksdb-write-performance http://git-wip-us.apache.org/repos/asf/samza/blob/a6ada1f6/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala index b4fa7d3..0858b98 100644 --- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala +++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala @@ -22,114 +22,134 @@ package org.apache.samza.test.performance import org.apache.samza.util.Logging import org.apache.samza.config.Config import org.apache.samza.config.StorageConfig._ -import org.apache.samza.container.{ TaskName, SamzaContainerContext } +import org.apache.samza.container.{TaskName, SamzaContainerContext} import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.storage.kv.KeyValueStore import org.apache.samza.storage.kv.KeyValueStorageEngine import org.apache.samza.storage.StorageEngineFactory import org.apache.samza.util.CommandLine import org.apache.samza.util.Util -import org.apache.samza.serializers.ByteSerde +import org.apache.samza.serializers.{StringSerde, ByteSerde, SerdeManager} import org.apache.samza.Partition import org.apache.samza.SamzaException import org.apache.samza.task.TaskInstanceCollector import org.apache.samza.system.SystemProducers import org.apache.samza.system.SystemProducer -import org.apache.samza.serializers.SerdeManager import java.io.File import java.util.UUID +import java.util /** * A simple CLI-based tool for running various key-value performance tests. + * + * List of KeyValuePerformance tests must be defined in 'test.methods' configuration as a comma-separated value. + * The tool splits this list to determine which tests to run. + * + * Each test should define its own set of configuration for partition count, stores etc. + * using the "test.<test-name>.<config-string>=<config-value>" pattern + * + * Each test may define one or more test parameterss. + * For example, test1 can define 2 sets of parameters by specifying "test.test1.set.count=2" and + * define each set as: + * "test.test1.set-1.<param-name>=<param-value>" + * "test.test1.set-2.<param-name>=<param-value>" */ + object TestKeyValuePerformance extends Logging { val Encoding = "UTF-8" - /** - * KeyValuePerformance job configs must define a 'test.method' configuration. - * This configuration must be the value of one of the keys in this map. The - * test uses this key to determine which test to run. - */ - val testMethods: Map[String, (Config) => Unit] = Map("testAllWithDeletes" -> runTestAllWithDeletes) + val testMethods: Map[String, (KeyValueStorageEngine[Array[Byte], Array[Byte]], Config) => Unit] = Map( + "all-with-deletes" -> runTestAllWithDeletes, + "rocksdb-write-performance" -> runTestMsgWritePerformance + ) def main(args: Array[String]) { val cmdline = new CommandLine val options = cmdline.parser.parse(args: _*) val config = cmdline.loadConfig(options) - val testMethod = config.get("test.method", "testAllWithDeletes") - - info("Got arguments: %s" format args.toList) - info("Using config: %s" format config) - info("Using test method: %s" format testMethod) - - if (testMethods.contains(testMethod)) { - testMethods(testMethod)(config) - } else { - error("Invalid test method. Valid methods are: %s" format testMethods.keys) + val tests = config.get("test.methods", "rocksdb-write-performance,all-with-deletes").split(",") - throw new SamzaException("Unknown test method: %s" format testMethod) + tests.foreach{ test => + info("Running test: %s" format test) + if(testMethods.contains(test)) { + invokeTest(test, testMethods(test), config.subset("test." + test + ".", true)) + } else { + error("Invalid test method. valid methods are: %s" format testMethods.keys) + throw new SamzaException("Unknown test method: %s" format test) + } } } - /** - * Do wiring for testAllWithDeletes, and run the test. - */ - def runTestAllWithDeletes(config: Config) { - val test = new TestKeyValuePerformance - val serde = new ByteSerde - val partitionCount = config.getInt("test.partition.count", 1) - val numLoops = config.getInt("test.num.loops", 100) - val messagesPerBatch = config.getInt("test.messages.per.batch", 10000) - val messageSizeBytes = config.getInt("test.message.size.bytes", 200) - val taskNames = new java.util.ArrayList[TaskName]() - + def invokeTest(testName: String, testMethod: (KeyValueStorageEngine[Array[Byte], Array[Byte]], Config) => Unit, config: Config) { + val taskNames = new util.ArrayList[TaskName]() + val partitionCount = config.getInt("partition.count", 1) (0 until partitionCount).map(p => taskNames.add(new TaskName(new Partition(p).toString))) - info("Using partition count: %s" format partitionCount) - info("Using num loops: %s" format numLoops) - info("Using messages per batch: %s" format messagesPerBatch) - info("Using message size: %s bytes" format messageSizeBytes) - + val producerMultiplexer = new SystemProducers( + Map[String, SystemProducer](), + new SerdeManager + ) // Build a Map[String, StorageEngineFactory]. The key is the store name. - val storageEngineFactories = config + val storageEngineMappings = config .getStoreNames .map(storeName => { - val storageFactoryClassName = config - .getStorageFactoryClassName(storeName) - .getOrElse(throw new SamzaException("Missing storage factory for %s." format storeName)) + val storageFactoryClassName = + config.getStorageFactoryClassName(storeName) + .getOrElse(throw new SamzaException("Missing storage factory for %s." format storeName)) (storeName, Util.getObj[StorageEngineFactory[Array[Byte], Array[Byte]]](storageFactoryClassName)) - }).toMap + }) - val producerMultiplexer = new SystemProducers( - Map[String, SystemProducer](), - new SerdeManager) + for((storeName, storageEngine) <- storageEngineMappings) { + val testSetCount = config.getInt("set.count", 1) + (1 to testSetCount).foreach(testSet => { + //Create a new DB instance for each test set + val output = new File("/tmp/" + UUID.randomUUID()) + val byteSerde = new ByteSerde + info("Using output directory %s for %s using %s." format (output, storeName, storageEngine.getClass.getCanonicalName)) + val engine = storageEngine.getStorageEngine( + storeName, + output, + byteSerde, + byteSerde, + new TaskInstanceCollector(producerMultiplexer), + new MetricsRegistryMap, + null, + new SamzaContainerContext(0, config, taskNames) + ) + + val db = if(!engine.isInstanceOf[KeyValueStorageEngine[_,_]]) { + throw new SamzaException("This test can only run with KeyValueStorageEngine configured as store factory.") + } else { + engine.asInstanceOf[KeyValueStorageEngine[Array[Byte], Array[Byte]]] + } + + // Run the test method + testMethod(db, config.subset("set-" + testSet + ".", true)) + + info("Cleaning up output directory for %s." format storeName) + Util.rm(output) + }) + } + } - for ((storeName, storageEngine) <- storageEngineFactories) { - val output = new File("/tmp/" + UUID.randomUUID) + def runTestAllWithDeletes(db: KeyValueStore[Array[Byte], Array[Byte]], config: Config) { + val numLoops = config.getInt("num.loops", 100) + val messagesPerBatch = config.getInt("messages.per.batch", 10000) + val messageSizeBytes = config.getInt("message.size.bytes", 200) - info("Using output directory %s for %s." format (output, storeName)) + info("Using (num loops, messages per batch, message size in bytes) => (%s, %s, %s)" format (numLoops, messagesPerBatch, messageSizeBytes)) + new TestKeyValuePerformance().testAllWithDeletes(db, numLoops, messagesPerBatch, messageSizeBytes) - val engine = storageEngine.getStorageEngine( - storeName, - output, - serde, - serde, - new TaskInstanceCollector(producerMultiplexer), - new MetricsRegistryMap, - null, - new SamzaContainerContext(0, config, taskNames)) + } - val db = if (!engine.isInstanceOf[KeyValueStorageEngine[_, _]]) { - throw new SamzaException("This test can only run with KeyValueStorageEngine configured as store factory.") - } else { - engine.asInstanceOf[KeyValueStorageEngine[Array[Byte], Array[Byte]]] - } + def runTestMsgWritePerformance(db: KeyValueStore[Array[Byte], Array[Byte]], config: Config) { + val messageSizeBytes = config.getInt("message.size", 200) + val messageCount = config.getInt("message.count", 10000) - test.testAllWithDeletes(db, numLoops, messagesPerBatch, messageSizeBytes) - info("Cleaning up output directory for %s." format storeName) - Util.rm(output) - } + info("Using (message count, message size in bytes) => (%s, %s)" format (messageCount, messageSizeBytes)) + new TestKeyValuePerformance().testMsgWritePerformance(db, messageCount, messageSizeBytes) } + } class TestKeyValuePerformance extends Logging { @@ -179,4 +199,27 @@ class TestKeyValuePerformance extends Logging { info("Total time: %ss" format ((System.currentTimeMillis - start) * .001)) } + + + /** + * Test that successively writes a set of fixed-size messages to the KV store + * and computes the total time for the operations + * @param store Key-Value store instance that is being tested + * @param numMsgs Total number of messages to write to the store + * @param msgSizeInBytes Size of each message in Bytes + */ + def testMsgWritePerformance( + store: KeyValueStore[Array[Byte], Array[Byte]], + numMsgs: Int = 10000, + msgSizeInBytes: Int = 200) { + + val msg = (0 until msgSizeInBytes).map(i => "x").mkString.getBytes(Encoding) + + val start = System.currentTimeMillis + (0 until numMsgs).foreach(i => { + store.put(i.toString.getBytes(Encoding), msg) + }) + val timeTaken = System.currentTimeMillis - start + info("Total time to write %d msgs of size %d bytes : %s s" format (numMsgs, msgSizeInBytes, timeTaken * .001)) + } } \ No newline at end of file
