Repository: samza
Updated Branches:
  refs/heads/master 85184d05b -> 711dd8dc3


SAMZA-1464: Flushing a closed RocksDB store causes SIGSEGVs

Made RocksDB operations check if DB is still open to avoid segfaults.

Author: Prateek Maheshwari <[email protected]>

Reviewers: Jagadish Venkatraman <[email protected]>, Xinyu Liu 
<[email protected]>

Closes #334 from prateekm/segfault-fix


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/711dd8dc
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/711dd8dc
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/711dd8dc

Branch: refs/heads/master
Commit: 711dd8dc30cdaf76cbc33bbaf7f80bf2b38347c6
Parents: 85184d0
Author: Prateek Maheshwari <[email protected]>
Authored: Wed Oct 25 16:27:52 2017 -0700
Committer: Prateek Maheshwari <[email protected]>
Committed: Wed Oct 25 16:27:52 2017 -0700

----------------------------------------------------------------------
 .../impl/store/TimeSeriesStoreImpl.java         |  1 -
 .../samza/storage/kv/RocksDbKeyValueStore.scala | 90 ++++++++++++++------
 .../storage/kv/TestRocksDbKeyValueStore.scala   | 36 +++++++-
 .../src/main/config/perf/kv-perf.properties     | 34 +++++++-
 .../performance/TestKeyValuePerformance.scala   | 37 +++++++-
 5 files changed, 165 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/711dd8dc/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
index ff7eee9..161f95a 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
@@ -164,7 +164,6 @@ public class TimeSeriesStoreImpl<K, V> implements 
TimeSeriesStore<K, V> {
 
   @Override
   public void close() {
-    kvStore.close();
   }
 
   private void validateRange(long startTimestamp, long endTimestamp) throws 
IllegalArgumentException {

http://git-wip-us.apache.org/repos/asf/samza/blob/711dd8dc/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
 
b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index 135cff9..2ae4bb0 100644
--- 
a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ 
b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -21,6 +21,7 @@ package org.apache.samza.storage.kv
 
 import java.io.File
 import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import org.apache.samza.SamzaException
 import org.apache.samza.config.Config
@@ -103,17 +104,25 @@ class RocksDbKeyValueStore(
   private lazy val db = RocksDbKeyValueStore.openDB(dir, options, storeConfig, 
isLoggedStore, storeName, metrics)
   private val lexicographic = new LexicographicComparator()
 
-  def get(key: Array[Byte]): Array[Byte] = {
+  /**
+    * null while the store is open. Set to an Exception holding the stacktrace 
at the time of first close by #close.
+    * Reads and writes to this field must be guarded by stateChangeLock.
+    * This is an Exception instead of an Array[StackTraceElement] for ease of 
logging.
+    */
+  private var stackAtFirstClose: Exception = null
+  private val stateChangeLock = new ReentrantReadWriteLock()
+
+  def get(key: Array[Byte]): Array[Byte] = ifOpen {
     metrics.gets.inc
     require(key != null, "Null key not allowed.")
     val found = db.get(key)
     if (found != null) {
-      metrics.bytesRead.inc(found.size)
+      metrics.bytesRead.inc(found.length)
     }
     found
   }
 
-  def getAll(keys: java.util.List[Array[Byte]]): java.util.Map[Array[Byte], 
Array[Byte]] = {
+  def getAll(keys: java.util.List[Array[Byte]]): java.util.Map[Array[Byte], 
Array[Byte]] = ifOpen {
     metrics.getAlls.inc
     require(keys != null, "Null keys not allowed.")
     val map = db.multiGet(keys)
@@ -123,7 +132,7 @@ class RocksDbKeyValueStore(
       while (iterator.hasNext) {
         val value = iterator.next
         if (value != null) {
-          bytesRead += value.size
+          bytesRead += value.length
         }
       }
       metrics.bytesRead.inc(bytesRead)
@@ -131,19 +140,19 @@ class RocksDbKeyValueStore(
     map
   }
 
-  def put(key: Array[Byte], value: Array[Byte]) {
+  def put(key: Array[Byte], value: Array[Byte]): Unit = ifOpen {
     metrics.puts.inc
     require(key != null, "Null key not allowed.")
     if (value == null) {
       db.delete(writeOptions, key)
     } else {
-      metrics.bytesWritten.inc(key.size + value.size)
+      metrics.bytesWritten.inc(key.length + value.length)
       db.put(writeOptions, key, value)
     }
   }
 
   // Write batch from RocksDB API is not used currently because of: 
https://github.com/facebook/rocksdb/issues/262
-  def putAll(entries: java.util.List[Entry[Array[Byte], Array[Byte]]]) {
+  def putAll(entries: java.util.List[Entry[Array[Byte], Array[Byte]]]): Unit = 
ifOpen {
     val iter = entries.iterator
     var wrote = 0
     var deletes = 0
@@ -156,7 +165,7 @@ class RocksDbKeyValueStore(
       } else {
         val key = curr.getKey
         val value = curr.getValue
-        metrics.bytesWritten.inc(key.size + value.size)
+        metrics.bytesWritten.inc(key.length + value.length)
         db.put(writeOptions, key, value)
       }
     }
@@ -164,55 +173,79 @@ class RocksDbKeyValueStore(
     metrics.deletes.inc(deletes)
   }
 
-  def delete(key: Array[Byte]) {
+  def delete(key: Array[Byte]): Unit = ifOpen {
     metrics.deletes.inc
     put(key, null)
   }
 
-  def range(from: Array[Byte], to: Array[Byte]): KeyValueIterator[Array[Byte], 
Array[Byte]] = {
+  def range(from: Array[Byte], to: Array[Byte]): KeyValueIterator[Array[Byte], 
Array[Byte]] = ifOpen {
     metrics.ranges.inc
     require(from != null && to != null, "Null bound not allowed.")
     new RocksDbRangeIterator(db.newIterator(), from, to)
   }
 
-  def all(): KeyValueIterator[Array[Byte], Array[Byte]] = {
+  def all(): KeyValueIterator[Array[Byte], Array[Byte]] = ifOpen {
     metrics.alls.inc
     val iter = db.newIterator()
     iter.seekToFirst()
     new RocksDbIterator(iter)
   }
 
-  def flush {
+  def flush(): Unit = ifOpen {
     metrics.flushes.inc
     trace("Flushing store: %s" format storeName)
     db.flush(flushOptions)
     trace("Flushed store: %s" format storeName)
   }
 
-  def close() {
-    trace("Closing.")
-    db.close()
+  def close(): Unit = {
+    stateChangeLock.writeLock().lock()
+    try {
+      trace("Closing.")
+      if (stackAtFirstClose == null) { // first close
+        stackAtFirstClose = new Exception()
+        db.close()
+      } else {
+        warn(new SamzaException("Close called again on a closed store: %s. 
Ignoring this close." +
+          "Stack at first close is under 'Caused By'." format storeName, 
stackAtFirstClose))
+      }
+    } finally {
+      stateChangeLock.writeLock().unlock()
+    }
+  }
+
+  private def ifOpen[T](fn: => T): T = {
+    stateChangeLock.readLock().lock()
+    try {
+      if (stackAtFirstClose == null) {
+        fn
+      } else {
+        throw new SamzaException("Attempted to access a closed store: %s. " +
+          "Stack at first close is under 'Caused By'." format storeName, 
stackAtFirstClose)
+      }
+    } finally {
+      stateChangeLock.readLock().unlock()
+    }
   }
 
   class RocksDbIterator(iter: RocksIterator) extends 
KeyValueIterator[Array[Byte], Array[Byte]] {
     private var open = true
-    private var firstValueAccessed = false
 
-    override def close() = {
+    override def close() = ifOpen {
       open = false
       iter.close()
     }
 
     override def remove() = throw new UnsupportedOperationException("RocksDB 
iterator doesn't support remove")
 
-    override def hasNext() = iter.isValid
+    override def hasNext() = ifOpen(iter.isValid)
 
     // The iterator is already pointing to the next element
-    protected def peekKey() = {
+    protected def peekKey() = ifOpen {
       getEntry().getKey
     }
 
-    protected def getEntry() = {
+    protected def getEntry() = ifOpen {
       val key = iter.key
       val value = iter.value
       new Entry(key, value)
@@ -224,21 +257,21 @@ class RocksDbKeyValueStore(
     // current element we are pointing to and advance the iterator to the next 
     // location (The new location may or may not be valid - this will surface
     // when the next next() call is made, the isValid will fail)
-    override def next() = {
+    override def next(): Entry[Array[Byte], Array[Byte]] = ifOpen {
       if (!hasNext()) {
         throw new NoSuchElementException
       }
 
       val entry = getEntry()
-      iter.next
-      metrics.bytesRead.inc(entry.getKey.size)
+      iter.next()
+      metrics.bytesRead.inc(entry.getKey.length)
       if (entry.getValue != null) {
-        metrics.bytesRead.inc(entry.getValue.size)
+        metrics.bytesRead.inc(entry.getValue.length)
       }
       entry
     }
 
-    override def finalize() {
+    override def finalize(): Unit = ifOpen {
       if (open) {
         trace("Leaked reference to RocksDB iterator, forcing close.")
         close()
@@ -250,9 +283,10 @@ class RocksDbKeyValueStore(
     // RocksDB's JNI interface does not expose getters/setters that allow the 
     // comparator to be pluggable, and the default is lexicographic, so it's
     // safe to just force lexicographic comparator here for now.
-    val comparator = lexicographic
-    iter.seek(from)
-    override def hasNext() = {
+    val comparator: LexicographicComparator = lexicographic
+    ifOpen(iter.seek(from))
+
+    override def hasNext() = ifOpen {
       super.hasNext() && comparator.compare(peekKey(), to) < 0
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/711dd8dc/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
 
b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
index 994387f..418e986 100644
--- 
a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
+++ 
b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
@@ -23,11 +23,12 @@ package org.apache.samza.storage.kv
 import java.io.File
 import java.util
 
+import org.apache.samza.SamzaException
 import org.apache.samza.config.MapConfig
 import org.apache.samza.metrics.{Gauge, MetricsRegistryMap}
 import org.apache.samza.util.ExponentialSleepStrategy
 import org.junit.{Assert, Test}
-import org.rocksdb.{RocksIterator, RocksDB, FlushOptions, Options}
+import org.rocksdb.{FlushOptions, Options, RocksDB, RocksIterator}
 
 class TestRocksDbKeyValueStore
 {
@@ -89,6 +90,39 @@ class TestRocksDbKeyValueStore
     rocksDBReadOnly.close()
   }
 
+  @Test(expected = classOf[SamzaException])
+  def testFlushAfterCloseThrowsException(): Unit = {
+    val map = new util.HashMap[String, String]()
+    val config = new MapConfig(map)
+    val options = new Options()
+    options.setCreateIfMissing(true)
+
+    val dbDir = new File(System.getProperty("java.io.tmpdir"))
+    val rocksDB = new RocksDbKeyValueStore(dbDir, options, config, false, 
"dbStore")
+
+    val key = "key".getBytes("UTF-8")
+    rocksDB.put(key, "val".getBytes("UTF-8"))
+
+    rocksDB.close()
+    rocksDB.flush()
+  }
+
+  @Test(expected = classOf[SamzaException])
+  def testGetAfterCloseThrowsException(): Unit = {
+    val map = new util.HashMap[String, String]()
+    val config = new MapConfig(map)
+    val options = new Options()
+    options.setCreateIfMissing(true)
+
+    val dbDir = new File(System.getProperty("java.io.tmpdir"))
+    val rocksDB = new RocksDbKeyValueStore(dbDir, options, config, false, 
"dbStore")
+
+    rocksDB.close()
+
+    val key = "key".getBytes("UTF-8")
+    rocksDB.get(key)
+  }
+
   @Test
   def testIteratorWithRemoval(): Unit = {
     val lock = new Object

http://git-wip-us.apache.org/repos/asf/samza/blob/711dd8dc/samza-test/src/main/config/perf/kv-perf.properties
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/perf/kv-perf.properties 
b/samza-test/src/main/config/perf/kv-perf.properties
index 7339052..7349c93 100644
--- a/samza-test/src/main/config/perf/kv-perf.properties
+++ b/samza-test/src/main/config/perf/kv-perf.properties
@@ -37,6 +37,38 @@ test.rocksdb-write-performance.set-2.message.count=1000000
 test.rocksdb-write-performance.set-3.message.size=1024
 test.rocksdb-write-performance.set-3.message.count=1000000
 
+
+# Config for rocksdb-concurrent-write-performance
+test.rocksdb-concurrent-write-performance.stores.test-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
+test.rocksdb-concurrent-write-performance.stores.test-store.object.cache.size=0
+test.rocksdb-concurrent-write-performance.partition.count=4
+
+test.rocksdb-concurrent-write-performance.set.count=6
+test.rocksdb-concurrent-write-performance.set-1.message.size=256
+test.rocksdb-concurrent-write-performance.set-1.message.count=1000000
+test.rocksdb-concurrent-write-performance.set-1.num.threads=4
+
+test.rocksdb-concurrent-write-performance.set-2.message.size=512
+test.rocksdb-concurrent-write-performance.set-2.message.count=1000000
+test.rocksdb-concurrent-write-performance.set-2.num.threads=4
+
+test.rocksdb-concurrent-write-performance.set-3.message.size=1024
+test.rocksdb-concurrent-write-performance.set-3.message.count=1000000
+test.rocksdb-concurrent-write-performance.set-3.num.threads=4
+
+test.rocksdb-concurrent-write-performance.set-4.message.size=256
+test.rocksdb-concurrent-write-performance.set-4.message.count=4000000
+test.rocksdb-concurrent-write-performance.set-4.num.threads=1
+
+test.rocksdb-concurrent-write-performance.set-5.message.size=512
+test.rocksdb-concurrent-write-performance.set-5.message.count=4000000
+test.rocksdb-concurrent-write-performance.set-5.num.threads=1
+
+test.rocksdb-concurrent-write-performance.set-6.message.size=1024
+test.rocksdb-concurrent-write-performance.set-6.message.count=4000000
+test.rocksdb-concurrent-write-performance.set-6.num.threads=1
+
+
 # Config for get-all-vs-get
 
test.get-all-vs-get-write-many-read-many.stores.test-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
 # Disable caching
@@ -52,4 +84,4 @@ test.get-all-vs-get-write-once-read-many.partition.count=4
 test.get-all-vs-get-write-once-read-many.set.count=3
 
 # List of tests to execute
-test.methods=rocksdb-write-performance,get-all-vs-get-write-many-read-many,get-all-vs-get-write-once-read-many
+test.methods=rocksdb-write-performance,rocksdb-concurrent-write-performance,get-all-vs-get-write-many-read-many,get-all-vs-get-write-once-read-many

http://git-wip-us.apache.org/repos/asf/samza/blob/711dd8dc/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
 
b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
index 8e853b7..d481782 100644
--- 
a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
+++ 
b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
@@ -62,6 +62,7 @@ object TestKeyValuePerformance extends Logging {
   val testMethods: Map[String, (KeyValueStorageEngine[Array[Byte], 
Array[Byte]], Config) => Unit] = Map(
     "all-with-deletes" -> runTestAllWithDeletes,
     "rocksdb-write-performance" -> runTestMsgWritePerformance,
+    "rocksdb-concurrent-write-performance" -> 
runTestConcurrentMsgWritePerformance,
     "get-all-vs-get-write-many-read-many" -> 
runTestGetAllVsGetWriteManyReadMany,
     "get-all-vs-get-write-once-read-many" -> 
runTestGetAllVsGetWriteOnceReadMany)
 
@@ -107,7 +108,6 @@ object TestKeyValuePerformance extends Logging {
         //Create a new DB instance for each test set
         val output = new File("/tmp/" + UUID.randomUUID())
         val byteSerde = new ByteSerde
-        info("Using output directory %s for %s using %s." format (output, 
storeName, storageEngine.getClass.getCanonicalName))
         val engine = storageEngine.getStorageEngine(
           storeName,
           output,
@@ -128,7 +128,6 @@ object TestKeyValuePerformance extends Logging {
         // Run the test method
         testMethod(db, config.subset("set-" + testSet + ".", true))
 
-        info("Cleaning up output directory for %s." format storeName)
         Util.rm(output)
       })
     }
@@ -151,6 +150,14 @@ object TestKeyValuePerformance extends Logging {
     new TestKeyValuePerformance().testMsgWritePerformance(db, messageCount, 
messageSizeBytes)
   }
 
+  def runTestConcurrentMsgWritePerformance(db: KeyValueStore[Array[Byte], 
Array[Byte]], config: Config) {
+    val messageSizeBytes = config.getInt("message.size", 200)
+    val messageCount = config.getInt("message.count", 100000)
+    val numThreads = config.getInt("num.threads", 4)
+
+    new TestKeyValuePerformance().testConcurrentMsgWritePerformance(db, 
messageCount, messageSizeBytes, numThreads)
+  }
+
   def runTestGetAllVsGetWriteManyReadMany(db: KeyValueStore[Array[Byte], 
Array[Byte]], config: Config) {
     new TestKeyValuePerformance().testGetAllVsGetWriteManyReadMany(db, config)
   }
@@ -230,6 +237,32 @@ class TestKeyValuePerformance extends Logging {
     info("Total time to write %d msgs of size %d bytes : %s s" format 
(numMsgs, msgSizeInBytes, timeTaken * .001))
   }
 
+  def testConcurrentMsgWritePerformance(
+    store: KeyValueStore[Array[Byte], Array[Byte]],
+    numMsgs: Int = 100000,
+    msgSizeInBytes: Int = 200,
+    numThreads: Int = 4) {
+
+    val msg = (0 until msgSizeInBytes).map(i => 
"x").mkString.getBytes(Encoding)
+    def createThread(name: String): Thread = new Thread(new Runnable {
+      override def run() = {
+        (0 until numMsgs).foreach(i => {
+          store.put(i.toString.getBytes(Encoding), msg)
+        })
+      }
+    }, name)
+
+    val threads = (0 until numThreads).map(i => createThread(s"Writer $i"))
+
+    val start = System.currentTimeMillis
+    threads.foreach(_.start())
+    threads.foreach(_.join())
+    val timeTaken = System.currentTimeMillis - start
+
+    info("Total time to write %d msgs of size %d bytes with %s threads: %s sec"
+      format (numMsgs, msgSizeInBytes, numThreads, timeTaken * .001))
+  }
+
   /**
    * Test that ::getAll performance is better than that of ::get (test when 
there are many writes and many reads).
    * @param store key-value store instance that is being tested

Reply via email to