Repository: incubator-samza Updated Branches: refs/heads/0.7.0 760f376c5 -> 6edceb209
SAMZA-254; improve store.all call time in cases where there are a lot of deletes Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/6edceb20 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/6edceb20 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/6edceb20 Branch: refs/heads/0.7.0 Commit: 6edceb2093fa43f314d9e45c74c953b2b94cb79f Parents: 760f376 Author: Chris Riccomini <[email protected]> Authored: Wed Apr 30 12:25:30 2014 -0700 Committer: Chris Riccomini <[email protected]> Committed: Wed Apr 30 12:31:41 2014 -0700 ---------------------------------------------------------------------- README.md | 4 + build.gradle | 9 + .../kv/KeyValueStorageEngineFactory.scala | 3 +- .../samza/storage/kv/LevelDbKeyValueStore.scala | 41 ++++- .../samza/storage/kv/TestKeyValueStores.scala | 1 - .../src/main/resources/perf/kv-perf.properties | 21 +++ .../performance/TestKeyValuePerformance.scala | 178 +++++++++++++++++++ 7 files changed, 251 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6edceb20/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index 7d0ff10..f6bfa9f 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,10 @@ To run a single test: ./gradlew clean :samza-test:test -Dtest.single=TestStatefulTask +To run key-value performance tests: + + ./gradlew samza-shell:kvPerformanceTest -PconfigPath=file://$PWD/samza-test/src/main/resources/perf/kv-perf.properties + ### Job Management To run a job (defined in a properties file): http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6edceb20/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 2d45c12..c9aba1d 100644 --- a/build.gradle +++ b/build.gradle @@ -166,6 +166,7 @@ project(":samza-shell") { dependencies { gradleShell project(":samza-core_$scalaVersion") gradleShell project(":samza-kafka_$scalaVersion") + gradleShell project(":samza-test_$scalaVersion") gradleShell project(":samza-yarn_$scalaVersion") gradleShell "org.slf4j:slf4j-simple:$slf4jVersion" } @@ -192,6 +193,14 @@ project(":samza-shell") { if (project.hasProperty('configPath')) args += ['--config-path', configPath] if (project.hasProperty('newOffsets')) args += ['--new-offsets', newOffsets] } + + // Usage: ./gradlew samza-shell:kvPerformanceTest + // -PconfigPath=file:///path/to/job/config.properties + task kvPerformanceTest(type:JavaExec) { + main = 'org.apache.samza.test.performance.TestKeyValuePerformance' + classpath = configurations.gradleShell + if (project.hasProperty('configPath')) args += ['--config-path', configPath] + } } project(":samza-kv_$scalaVersion") { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6edceb20/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala index 81fe861..36b7e97 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala @@ -44,6 +44,7 @@ class KeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V] { val storageConfig = containerContext.config.subset("stores." + storeName + ".", true) val batchSize = storageConfig.getInt("write.batch.size", 500) val cacheSize = storageConfig.getInt("object.cache.size", math.max(batchSize, 1000)) + val deleteCompactionThreshold = storageConfig.getInt("compaction.delete.threshold", -1) val enableCache = cacheSize > 0 if (cacheSize > 0 && cacheSize < batchSize) { @@ -60,7 +61,7 @@ class KeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V] { val levelDbMetrics = new LevelDbKeyValueStoreMetrics(storeName, registry) val levelDbOptions = LevelDbKeyValueStore.options(storageConfig, containerContext) - val levelDb = new LevelDbKeyValueStore(storeDir, levelDbOptions, levelDbMetrics) + val levelDb = new LevelDbKeyValueStore(storeDir, levelDbOptions, deleteCompactionThreshold, levelDbMetrics) val maybeLoggedStore = if (changeLogSystemStreamPartition == null) { levelDb } else { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6edceb20/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala index 8602a32..dae3c2c 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala @@ -28,7 +28,7 @@ import java.util.Iterator import java.lang.Iterable import org.apache.samza.config.Config import org.apache.samza.container.SamzaContainerContext -import grizzled.slf4j.{Logger, Logging} +import grizzled.slf4j.{ Logger, Logging } object LevelDbKeyValueStore { private lazy val logger = Logger(classOf[LevelDbKeyValueStore]) @@ -45,12 +45,11 @@ object LevelDbKeyValueStore { options.compressionType( storeConfig.get("leveldb.compression", "snappy") match { case "snappy" => CompressionType.SNAPPY - case "none" => CompressionType.NONE + case "none" => CompressionType.NONE case _ => logger.warn("Unknown leveldb.compression codec %s, defaulting to Snappy" format storeConfig.get("leveldb.compression", "snappy")) CompressionType.SNAPPY - } - ) + }) options.createIfMissing(true) options.errorIfExists(true) options @@ -60,12 +59,21 @@ object LevelDbKeyValueStore { class LevelDbKeyValueStore( val dir: File, val options: Options, + + /** + * How many deletes must occur before we will force a compaction. This is to + * get around performance issues discovered in SAMZA-254. A value of -1 + * disables this feature. + */ + val deleteCompactionThreshold: Int = -1, val metrics: LevelDbKeyValueStoreMetrics = new LevelDbKeyValueStoreMetrics) extends KeyValueStore[Array[Byte], Array[Byte]] with Logging { private lazy val db = factory.open(dir, options) private val lexicographic = new LexicographicComparator() + private var deletesSinceLastCompaction = 0 def get(key: Array[Byte]): Array[Byte] = { + maybeCompact metrics.gets.inc require(key != null, "Null key not allowed.") val found = db.get(key) @@ -80,6 +88,7 @@ class LevelDbKeyValueStore( require(key != null, "Null key not allowed.") if (value == null) { db.delete(key) + deletesSinceLastCompaction += 1 } else { metrics.bytesWritten.inc(key.size + value.size) db.put(key, value) @@ -108,6 +117,7 @@ class LevelDbKeyValueStore( batch.close metrics.puts.inc(wrote) metrics.deletes.inc(deletes) + deletesSinceLastCompaction += deletes } def delete(key: Array[Byte]) { @@ -116,18 +126,41 @@ class LevelDbKeyValueStore( } def range(from: Array[Byte], to: Array[Byte]): KeyValueIterator[Array[Byte], Array[Byte]] = { + maybeCompact metrics.ranges.inc require(from != null && to != null, "Null bound not allowed.") new LevelDbRangeIterator(db.iterator, from, to) } def all(): KeyValueIterator[Array[Byte], Array[Byte]] = { + maybeCompact metrics.alls.inc val iter = db.iterator() iter.seekToFirst() new LevelDbIterator(iter) } + /** + * Trigger a complete compaction on the LevelDB store if there have been at + * least deleteCompactionThreshold deletes since the last compaction. + */ + def maybeCompact = { + if (deleteCompactionThreshold >= 0 && deletesSinceLastCompaction >= deleteCompactionThreshold) { + compact + } + } + + /** + * Trigger a complete compaction of the LevelDB store. + */ + def compact { + // According to LevelDB's docs: + // begin==NULL is treated as a key before all keys in the database. + // end==NULL is treated as a key after all keys in the database. + db.compactRange(null, null) + deletesSinceLastCompaction = 0 + } + def flush { metrics.flushes.inc // TODO can't find a flush for leveldb http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6edceb20/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala index 85ba11a..bed9f84 100644 --- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala +++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala @@ -307,7 +307,6 @@ class TestKeyValueStores(typeOfStore: String) { */ def s(b: Array[Byte]) = new String(b) - } object TestKeyValueStores { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6edceb20/samza-test/src/main/resources/perf/kv-perf.properties ---------------------------------------------------------------------- diff --git a/samza-test/src/main/resources/perf/kv-perf.properties b/samza-test/src/main/resources/perf/kv-perf.properties new file mode 100644 index 0000000..dcc223f --- /dev/null +++ b/samza-test/src/main/resources/perf/kv-perf.properties @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +stores.test.factory=org.apache.samza.storage.kv.KeyValueStorageEngineFactory +stores.test.compaction.delete.threshold=1000 +test.partition.count=4 +test.num.loops=1000 http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6edceb20/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala new file mode 100644 index 0000000..5b9b926 --- /dev/null +++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.performance + +import grizzled.slf4j.Logging +import org.apache.samza.config.Config +import org.apache.samza.config.StorageConfig._ +import org.apache.samza.container.SamzaContainerContext +import org.apache.samza.metrics.MetricsRegistryMap +import org.apache.samza.storage.kv.KeyValueStore +import org.apache.samza.storage.kv.KeyValueStorageEngine +import org.apache.samza.storage.kv.KeyValueStorageEngineFactory +import org.apache.samza.storage.StorageEngineFactory +import org.apache.samza.task.ReadableCollector +import org.apache.samza.util.CommandLine +import org.apache.samza.util.Util +import org.apache.samza.serializers.Serde +import org.apache.samza.serializers.ByteSerde +import org.apache.samza.Partition +import org.apache.samza.SamzaException +import java.io.File +import scala.collection.JavaConversions._ +import java.util.UUID + +/** + * A simple CLI-based tool for running various key-value performance tests. + */ +object TestKeyValuePerformance extends Logging { + val Encoding = "UTF-8" + + /** + * KeyValuePerformance job configs must define a 'test.method' configuration. + * This configuration must be the value of one of the keys in this map. The + * test uses this key to determine which test to run. + */ + val testMethods: Map[String, (Config) => Unit] = Map("testAllWithDeletes" -> runTestAllWithDeletes) + + def main(args: Array[String]) { + val cmdline = new CommandLine + val options = cmdline.parser.parse(args: _*) + val config = cmdline.loadConfig(options) + val testMethod = config.get("test.method", "testAllWithDeletes") + + info("Got arguments: %s" format args.toList) + info("Using config: %s" format config) + info("Using test method: %s" format testMethod) + + if (testMethods.contains(testMethod)) { + testMethods(testMethod)(config) + } else { + error("Invalid test method. Valid methods are: %s" format testMethods.keys) + + throw new SamzaException("Unknown test method: %s" format testMethod) + } + } + + /** + * Do wiring for testAllWithDeletes, and run the test. + */ + def runTestAllWithDeletes(config: Config) { + val test = new TestKeyValuePerformance + val serde = new ByteSerde + val partitionCount = config.getInt("test.partition.count", 1) + val numLoops = config.getInt("test.num.loops", 100) + val messagesPerBatch = config.getInt("test.messages.per.batch", 10000) + val messageSizeBytes = config.getInt("test.message.size.bytes", 200) + val partitions = (0 until partitionCount).map(new Partition(_)) + + info("Using partition count: %s" format partitionCount) + info("Using num loops: %s" format numLoops) + info("Using messages per batch: %s" format messagesPerBatch) + info("Using message size: %s bytes" format messageSizeBytes) + + // Build a Map[String, StorageEngineFactory]. The key is the store name. + val storageEngineFactories = config + .getStoreNames + .map(storeName => { + val storageFactoryClassName = config + .getStorageFactoryClassName(storeName) + .getOrElse(throw new SamzaException("Missing storage factory for %s." format storeName)) + (storeName, Util.getObj[StorageEngineFactory[Array[Byte], Array[Byte]]](storageFactoryClassName)) + }).toMap + + for ((storeName, storageEngine) <- storageEngineFactories) { + val output = new File("/tmp/" + UUID.randomUUID) + + info("Using output directory %s for %s." format (output, storeName)) + + val engine = storageEngine.getStorageEngine( + storeName, + output, + serde, + serde, + new ReadableCollector, + new MetricsRegistryMap, + null, + new SamzaContainerContext("test", config, partitions)) + + val db = if (!engine.isInstanceOf[KeyValueStorageEngine[Array[Byte], Array[Byte]]]) { + throw new SamzaException("This test can only run with KeyValueStorageEngine configured as store factory.") + } else { + engine.asInstanceOf[KeyValueStorageEngine[Array[Byte], Array[Byte]]] + } + + test.testAllWithDeletes(db, numLoops, messagesPerBatch, messageSizeBytes) + + info("Cleaning up output directory for %s." format storeName) + + Util.rm(output) + } + } +} + +class TestKeyValuePerformance extends Logging { + import TestKeyValuePerformance._ + + /** + * A test that writes messagesPerBatch messages, deletes them all, then calls + * store.all. The test periodically outputs the time it takes to complete + * these operations. This test is useful to trouble shoot issues with LevleDB + * such as the issue documented in SAMZA-254. + */ + def testAllWithDeletes( + store: KeyValueStore[Array[Byte], Array[Byte]], + + /** + * How many times a batch of messages should be written and deleted. + */ + numLoops: Int = 100, + + /** + * The number of messages to write and delete per-batch. + */ + messagesPerBatch: Int = 10000, + + /** + * The size of the messages to write. + */ + messageSizeBytes: Int = 200) { + + val stuff = (0 until messageSizeBytes).map(i => "a").mkString.getBytes(Encoding) + val start = System.currentTimeMillis + + (0 until numLoops).foreach(i => { + info("(%sms) Total written to store: %s" format (System.currentTimeMillis - start, i * messagesPerBatch)) + + (0 until messagesPerBatch).foreach(j => { + val k = (i * j).toString.getBytes(Encoding) + store.put(k, stuff) + store.delete(k) + }) + + val allStart = System.currentTimeMillis + val iter = store.all + info("(%sms) all() took %sms." format (System.currentTimeMillis - start, System.currentTimeMillis - allStart)) + iter.close + }) + + info("Total time: %ss" format ((System.currentTimeMillis - start) * .001)) + } +} \ No newline at end of file
