Repository: samza Updated Branches: refs/heads/master 85184d05b -> 711dd8dc3
SAMZA-1464: Flushing a closed RocksDB store causes SIGSEGVs Made RocksDB operations check if DB is still open to avoid segfaults. Author: Prateek Maheshwari <[email protected]> Reviewers: Jagadish Venkatraman <[email protected]>, Xinyu Liu <[email protected]> Closes #334 from prateekm/segfault-fix Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/711dd8dc Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/711dd8dc Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/711dd8dc Branch: refs/heads/master Commit: 711dd8dc30cdaf76cbc33bbaf7f80bf2b38347c6 Parents: 85184d0 Author: Prateek Maheshwari <[email protected]> Authored: Wed Oct 25 16:27:52 2017 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Wed Oct 25 16:27:52 2017 -0700 ---------------------------------------------------------------------- .../impl/store/TimeSeriesStoreImpl.java | 1 - .../samza/storage/kv/RocksDbKeyValueStore.scala | 90 ++++++++++++++------ .../storage/kv/TestRocksDbKeyValueStore.scala | 36 +++++++- .../src/main/config/perf/kv-perf.properties | 34 +++++++- .../performance/TestKeyValuePerformance.scala | 37 +++++++- 5 files changed, 165 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/711dd8dc/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java index ff7eee9..161f95a 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java @@ -164,7 +164,6 @@ public class TimeSeriesStoreImpl<K, V> implements TimeSeriesStore<K, V> { @Override public void close() { - kvStore.close(); } private void validateRange(long startTimestamp, long endTimestamp) throws IllegalArgumentException { http://git-wip-us.apache.org/repos/asf/samza/blob/711dd8dc/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala index 135cff9..2ae4bb0 100644 --- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala @@ -21,6 +21,7 @@ package org.apache.samza.storage.kv import java.io.File import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.ReentrantReadWriteLock import org.apache.samza.SamzaException import org.apache.samza.config.Config @@ -103,17 +104,25 @@ class RocksDbKeyValueStore( private lazy val db = RocksDbKeyValueStore.openDB(dir, options, storeConfig, isLoggedStore, storeName, metrics) private val lexicographic = new LexicographicComparator() - def get(key: Array[Byte]): Array[Byte] = { + /** + * null while the store is open. Set to an Exception holding the stacktrace at the time of first close by #close. + * Reads and writes to this field must be guarded by stateChangeLock. + * This is an Exception instead of an Array[StackTraceElement] for ease of logging. + */ + private var stackAtFirstClose: Exception = null + private val stateChangeLock = new ReentrantReadWriteLock() + + def get(key: Array[Byte]): Array[Byte] = ifOpen { metrics.gets.inc require(key != null, "Null key not allowed.") val found = db.get(key) if (found != null) { - metrics.bytesRead.inc(found.size) + metrics.bytesRead.inc(found.length) } found } - def getAll(keys: java.util.List[Array[Byte]]): java.util.Map[Array[Byte], Array[Byte]] = { + def getAll(keys: java.util.List[Array[Byte]]): java.util.Map[Array[Byte], Array[Byte]] = ifOpen { metrics.getAlls.inc require(keys != null, "Null keys not allowed.") val map = db.multiGet(keys) @@ -123,7 +132,7 @@ class RocksDbKeyValueStore( while (iterator.hasNext) { val value = iterator.next if (value != null) { - bytesRead += value.size + bytesRead += value.length } } metrics.bytesRead.inc(bytesRead) @@ -131,19 +140,19 @@ class RocksDbKeyValueStore( map } - def put(key: Array[Byte], value: Array[Byte]) { + def put(key: Array[Byte], value: Array[Byte]): Unit = ifOpen { metrics.puts.inc require(key != null, "Null key not allowed.") if (value == null) { db.delete(writeOptions, key) } else { - metrics.bytesWritten.inc(key.size + value.size) + metrics.bytesWritten.inc(key.length + value.length) db.put(writeOptions, key, value) } } // Write batch from RocksDB API is not used currently because of: https://github.com/facebook/rocksdb/issues/262 - def putAll(entries: java.util.List[Entry[Array[Byte], Array[Byte]]]) { + def putAll(entries: java.util.List[Entry[Array[Byte], Array[Byte]]]): Unit = ifOpen { val iter = entries.iterator var wrote = 0 var deletes = 0 @@ -156,7 +165,7 @@ class RocksDbKeyValueStore( } else { val key = curr.getKey val value = curr.getValue - metrics.bytesWritten.inc(key.size + value.size) + metrics.bytesWritten.inc(key.length + value.length) db.put(writeOptions, key, value) } } @@ -164,55 +173,79 @@ class RocksDbKeyValueStore( metrics.deletes.inc(deletes) } - def delete(key: Array[Byte]) { + def delete(key: Array[Byte]): Unit = ifOpen { metrics.deletes.inc put(key, null) } - def range(from: Array[Byte], to: Array[Byte]): KeyValueIterator[Array[Byte], Array[Byte]] = { + def range(from: Array[Byte], to: Array[Byte]): KeyValueIterator[Array[Byte], Array[Byte]] = ifOpen { metrics.ranges.inc require(from != null && to != null, "Null bound not allowed.") new RocksDbRangeIterator(db.newIterator(), from, to) } - def all(): KeyValueIterator[Array[Byte], Array[Byte]] = { + def all(): KeyValueIterator[Array[Byte], Array[Byte]] = ifOpen { metrics.alls.inc val iter = db.newIterator() iter.seekToFirst() new RocksDbIterator(iter) } - def flush { + def flush(): Unit = ifOpen { metrics.flushes.inc trace("Flushing store: %s" format storeName) db.flush(flushOptions) trace("Flushed store: %s" format storeName) } - def close() { - trace("Closing.") - db.close() + def close(): Unit = { + stateChangeLock.writeLock().lock() + try { + trace("Closing.") + if (stackAtFirstClose == null) { // first close + stackAtFirstClose = new Exception() + db.close() + } else { + warn(new SamzaException("Close called again on a closed store: %s. Ignoring this close." + + "Stack at first close is under 'Caused By'." format storeName, stackAtFirstClose)) + } + } finally { + stateChangeLock.writeLock().unlock() + } + } + + private def ifOpen[T](fn: => T): T = { + stateChangeLock.readLock().lock() + try { + if (stackAtFirstClose == null) { + fn + } else { + throw new SamzaException("Attempted to access a closed store: %s. " + + "Stack at first close is under 'Caused By'." format storeName, stackAtFirstClose) + } + } finally { + stateChangeLock.readLock().unlock() + } } class RocksDbIterator(iter: RocksIterator) extends KeyValueIterator[Array[Byte], Array[Byte]] { private var open = true - private var firstValueAccessed = false - override def close() = { + override def close() = ifOpen { open = false iter.close() } override def remove() = throw new UnsupportedOperationException("RocksDB iterator doesn't support remove") - override def hasNext() = iter.isValid + override def hasNext() = ifOpen(iter.isValid) // The iterator is already pointing to the next element - protected def peekKey() = { + protected def peekKey() = ifOpen { getEntry().getKey } - protected def getEntry() = { + protected def getEntry() = ifOpen { val key = iter.key val value = iter.value new Entry(key, value) @@ -224,21 +257,21 @@ class RocksDbKeyValueStore( // current element we are pointing to and advance the iterator to the next // location (The new location may or may not be valid - this will surface // when the next next() call is made, the isValid will fail) - override def next() = { + override def next(): Entry[Array[Byte], Array[Byte]] = ifOpen { if (!hasNext()) { throw new NoSuchElementException } val entry = getEntry() - iter.next - metrics.bytesRead.inc(entry.getKey.size) + iter.next() + metrics.bytesRead.inc(entry.getKey.length) if (entry.getValue != null) { - metrics.bytesRead.inc(entry.getValue.size) + metrics.bytesRead.inc(entry.getValue.length) } entry } - override def finalize() { + override def finalize(): Unit = ifOpen { if (open) { trace("Leaked reference to RocksDB iterator, forcing close.") close() @@ -250,9 +283,10 @@ class RocksDbKeyValueStore( // RocksDB's JNI interface does not expose getters/setters that allow the // comparator to be pluggable, and the default is lexicographic, so it's // safe to just force lexicographic comparator here for now. - val comparator = lexicographic - iter.seek(from) - override def hasNext() = { + val comparator: LexicographicComparator = lexicographic + ifOpen(iter.seek(from)) + + override def hasNext() = ifOpen { super.hasNext() && comparator.compare(peekKey(), to) < 0 } } http://git-wip-us.apache.org/repos/asf/samza/blob/711dd8dc/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala index 994387f..418e986 100644 --- a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala +++ b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala @@ -23,11 +23,12 @@ package org.apache.samza.storage.kv import java.io.File import java.util +import org.apache.samza.SamzaException import org.apache.samza.config.MapConfig import org.apache.samza.metrics.{Gauge, MetricsRegistryMap} import org.apache.samza.util.ExponentialSleepStrategy import org.junit.{Assert, Test} -import org.rocksdb.{RocksIterator, RocksDB, FlushOptions, Options} +import org.rocksdb.{FlushOptions, Options, RocksDB, RocksIterator} class TestRocksDbKeyValueStore { @@ -89,6 +90,39 @@ class TestRocksDbKeyValueStore rocksDBReadOnly.close() } + @Test(expected = classOf[SamzaException]) + def testFlushAfterCloseThrowsException(): Unit = { + val map = new util.HashMap[String, String]() + val config = new MapConfig(map) + val options = new Options() + options.setCreateIfMissing(true) + + val dbDir = new File(System.getProperty("java.io.tmpdir")) + val rocksDB = new RocksDbKeyValueStore(dbDir, options, config, false, "dbStore") + + val key = "key".getBytes("UTF-8") + rocksDB.put(key, "val".getBytes("UTF-8")) + + rocksDB.close() + rocksDB.flush() + } + + @Test(expected = classOf[SamzaException]) + def testGetAfterCloseThrowsException(): Unit = { + val map = new util.HashMap[String, String]() + val config = new MapConfig(map) + val options = new Options() + options.setCreateIfMissing(true) + + val dbDir = new File(System.getProperty("java.io.tmpdir")) + val rocksDB = new RocksDbKeyValueStore(dbDir, options, config, false, "dbStore") + + rocksDB.close() + + val key = "key".getBytes("UTF-8") + rocksDB.get(key) + } + @Test def testIteratorWithRemoval(): Unit = { val lock = new Object http://git-wip-us.apache.org/repos/asf/samza/blob/711dd8dc/samza-test/src/main/config/perf/kv-perf.properties ---------------------------------------------------------------------- diff --git a/samza-test/src/main/config/perf/kv-perf.properties b/samza-test/src/main/config/perf/kv-perf.properties index 7339052..7349c93 100644 --- a/samza-test/src/main/config/perf/kv-perf.properties +++ b/samza-test/src/main/config/perf/kv-perf.properties @@ -37,6 +37,38 @@ test.rocksdb-write-performance.set-2.message.count=1000000 test.rocksdb-write-performance.set-3.message.size=1024 test.rocksdb-write-performance.set-3.message.count=1000000 + +# Config for rocksdb-concurrent-write-performance +test.rocksdb-concurrent-write-performance.stores.test-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory +test.rocksdb-concurrent-write-performance.stores.test-store.object.cache.size=0 +test.rocksdb-concurrent-write-performance.partition.count=4 + +test.rocksdb-concurrent-write-performance.set.count=6 +test.rocksdb-concurrent-write-performance.set-1.message.size=256 +test.rocksdb-concurrent-write-performance.set-1.message.count=1000000 +test.rocksdb-concurrent-write-performance.set-1.num.threads=4 + +test.rocksdb-concurrent-write-performance.set-2.message.size=512 +test.rocksdb-concurrent-write-performance.set-2.message.count=1000000 +test.rocksdb-concurrent-write-performance.set-2.num.threads=4 + +test.rocksdb-concurrent-write-performance.set-3.message.size=1024 +test.rocksdb-concurrent-write-performance.set-3.message.count=1000000 +test.rocksdb-concurrent-write-performance.set-3.num.threads=4 + +test.rocksdb-concurrent-write-performance.set-4.message.size=256 +test.rocksdb-concurrent-write-performance.set-4.message.count=4000000 +test.rocksdb-concurrent-write-performance.set-4.num.threads=1 + +test.rocksdb-concurrent-write-performance.set-5.message.size=512 +test.rocksdb-concurrent-write-performance.set-5.message.count=4000000 +test.rocksdb-concurrent-write-performance.set-5.num.threads=1 + +test.rocksdb-concurrent-write-performance.set-6.message.size=1024 +test.rocksdb-concurrent-write-performance.set-6.message.count=4000000 +test.rocksdb-concurrent-write-performance.set-6.num.threads=1 + + # Config for get-all-vs-get test.get-all-vs-get-write-many-read-many.stores.test-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory # Disable caching @@ -52,4 +84,4 @@ test.get-all-vs-get-write-once-read-many.partition.count=4 test.get-all-vs-get-write-once-read-many.set.count=3 # List of tests to execute -test.methods=rocksdb-write-performance,get-all-vs-get-write-many-read-many,get-all-vs-get-write-once-read-many +test.methods=rocksdb-write-performance,rocksdb-concurrent-write-performance,get-all-vs-get-write-many-read-many,get-all-vs-get-write-once-read-many http://git-wip-us.apache.org/repos/asf/samza/blob/711dd8dc/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala index 8e853b7..d481782 100644 --- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala +++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala @@ -62,6 +62,7 @@ object TestKeyValuePerformance extends Logging { val testMethods: Map[String, (KeyValueStorageEngine[Array[Byte], Array[Byte]], Config) => Unit] = Map( "all-with-deletes" -> runTestAllWithDeletes, "rocksdb-write-performance" -> runTestMsgWritePerformance, + "rocksdb-concurrent-write-performance" -> runTestConcurrentMsgWritePerformance, "get-all-vs-get-write-many-read-many" -> runTestGetAllVsGetWriteManyReadMany, "get-all-vs-get-write-once-read-many" -> runTestGetAllVsGetWriteOnceReadMany) @@ -107,7 +108,6 @@ object TestKeyValuePerformance extends Logging { //Create a new DB instance for each test set val output = new File("/tmp/" + UUID.randomUUID()) val byteSerde = new ByteSerde - info("Using output directory %s for %s using %s." format (output, storeName, storageEngine.getClass.getCanonicalName)) val engine = storageEngine.getStorageEngine( storeName, output, @@ -128,7 +128,6 @@ object TestKeyValuePerformance extends Logging { // Run the test method testMethod(db, config.subset("set-" + testSet + ".", true)) - info("Cleaning up output directory for %s." format storeName) Util.rm(output) }) } @@ -151,6 +150,14 @@ object TestKeyValuePerformance extends Logging { new TestKeyValuePerformance().testMsgWritePerformance(db, messageCount, messageSizeBytes) } + def runTestConcurrentMsgWritePerformance(db: KeyValueStore[Array[Byte], Array[Byte]], config: Config) { + val messageSizeBytes = config.getInt("message.size", 200) + val messageCount = config.getInt("message.count", 100000) + val numThreads = config.getInt("num.threads", 4) + + new TestKeyValuePerformance().testConcurrentMsgWritePerformance(db, messageCount, messageSizeBytes, numThreads) + } + def runTestGetAllVsGetWriteManyReadMany(db: KeyValueStore[Array[Byte], Array[Byte]], config: Config) { new TestKeyValuePerformance().testGetAllVsGetWriteManyReadMany(db, config) } @@ -230,6 +237,32 @@ class TestKeyValuePerformance extends Logging { info("Total time to write %d msgs of size %d bytes : %s s" format (numMsgs, msgSizeInBytes, timeTaken * .001)) } + def testConcurrentMsgWritePerformance( + store: KeyValueStore[Array[Byte], Array[Byte]], + numMsgs: Int = 100000, + msgSizeInBytes: Int = 200, + numThreads: Int = 4) { + + val msg = (0 until msgSizeInBytes).map(i => "x").mkString.getBytes(Encoding) + def createThread(name: String): Thread = new Thread(new Runnable { + override def run() = { + (0 until numMsgs).foreach(i => { + store.put(i.toString.getBytes(Encoding), msg) + }) + } + }, name) + + val threads = (0 until numThreads).map(i => createThread(s"Writer $i")) + + val start = System.currentTimeMillis + threads.foreach(_.start()) + threads.foreach(_.join()) + val timeTaken = System.currentTimeMillis - start + + info("Total time to write %d msgs of size %d bytes with %s threads: %s sec" + format (numMsgs, msgSizeInBytes, numThreads, timeTaken * .001)) + } + /** * Test that ::getAll performance is better than that of ::get (test when there are many writes and many reads). * @param store key-value store instance that is being tested
