SAMZA-819 RocksDbKeyValueStore.flush() should be implemented
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c84a0b54 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c84a0b54 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c84a0b54 Branch: refs/heads/samza-sql Commit: c84a0b54d634de5a4c996ae3ce9742d98ae23342 Parents: 429f245 Author: Tao Feng <[email protected]> Authored: Fri Nov 20 14:43:36 2015 -0800 Committer: Navina <[email protected]> Committed: Fri Nov 20 14:43:36 2015 -0800 ---------------------------------------------------------------------- .../RocksDbKeyValueStorageEngineFactory.scala | 5 ++-- .../samza/storage/kv/RocksDbKeyValueStore.scala | 5 ++-- .../storage/kv/TestRocksDbKeyValueStore.scala | 24 +++++++++++++++++++- 3 files changed, 29 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/c84a0b54/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala index b949793..dae6e35 100644 --- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala @@ -24,7 +24,7 @@ import org.apache.samza.container.SamzaContainerContext import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.storage.kv._ import org.apache.samza.system.SystemStreamPartition -import org.rocksdb.WriteOptions +import org.rocksdb.{FlushOptions, WriteOptions} import org.apache.samza.config.StorageConfig._ class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngineFactory[K, V] @@ -48,7 +48,8 @@ class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngi val rocksDbMetrics = new KeyValueStoreMetrics(storeName, registry) val rocksDbOptions = RocksDbOptionsHelper.options(storageConfig, containerContext) val rocksDbWriteOptions = new WriteOptions().setDisableWAL(true) - val rocksDb = new RocksDbKeyValueStore(storeDir, rocksDbOptions, storageConfig, isLoggedStore, storeName, rocksDbWriteOptions, rocksDbMetrics) + val rocksDbFlushOptions = new FlushOptions().setWaitForFlush(true) + val rocksDb = new RocksDbKeyValueStore(storeDir, rocksDbOptions, storageConfig, isLoggedStore, storeName, rocksDbWriteOptions, rocksDbFlushOptions, rocksDbMetrics) rocksDb } } http://git-wip-us.apache.org/repos/asf/samza/blob/c84a0b54/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala index 4620037..211fc3b 100644 --- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala @@ -95,6 +95,7 @@ class RocksDbKeyValueStore( val isLoggedStore: Boolean, val storeName: String, val writeOptions: WriteOptions = new WriteOptions(), + val flushOptions: FlushOptions = new FlushOptions(), val metrics: KeyValueStoreMetrics = new KeyValueStoreMetrics) extends KeyValueStore[Array[Byte], Array[Byte]] with Logging { // lazy val here is important because the store directories do not exist yet, it can only be opened @@ -190,8 +191,8 @@ class RocksDbKeyValueStore( def flush { metrics.flushes.inc - // TODO still not exposed in Java RocksDB API, follow up with rocksDB team - trace("Flush in RocksDbKeyValueStore is not supported, ignoring") + trace("Flushing.") + db.flush(flushOptions) } def close() { http://git-wip-us.apache.org/repos/asf/samza/blob/c84a0b54/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala index a428a16..0c86a5a 100644 --- a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala +++ b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala @@ -30,7 +30,7 @@ import org.apache.samza.system.SystemStreamPartition import org.apache.samza.util.{NoOpMetricsRegistry, ExponentialSleepStrategy} import org.apache.samza.util.Util._ import org.junit.{Assert, Test} -import org.rocksdb.{RocksDBException, Options} +import org.rocksdb.{RocksDB, FlushOptions, RocksDBException, Options} class TestRocksDbKeyValueStore { @@ -65,4 +65,26 @@ class TestRocksDbKeyValueStore Assert.assertNull(rocksDB.get(key)) rocksDB.close() } + + @Test + def testFlush(): Unit = { + val map = new util.HashMap[String, String]() + val config = new MapConfig(map) + val flushOptions = new FlushOptions().setWaitForFlush(true) + val options = new Options() + options.setCreateIfMissing(true) + val rocksDB = RocksDbKeyValueStore.openDB(new File(System.getProperty("java.io.tmpdir")), + options, + config, + false, + "dbStore") + val key = "key".getBytes("UTF-8") + rocksDB.put(key, "val".getBytes("UTF-8")) + rocksDB.flush(flushOptions) + val dbDir = new File(System.getProperty("java.io.tmpdir")).toString + val rocksDBReadOnly = RocksDB.openReadOnly(options, dbDir) + Assert.assertEquals(new String(rocksDBReadOnly.get(key), "UTF-8"), "val") + rocksDB.close() + rocksDBReadOnly.close() + } }
