Repository: samza Updated Branches: refs/heads/master c6c10d31e -> 1c7e4d7aa
SAMZA-984; Upgraded RocksDB version to 5.0.1 and added configuration for managing RocksDB logging. Author: Prateek Maheshwari <pmahe...@linkedin.com> Reviewers: Jake Maes <jacob.m...@gmail.com>, Jagadish <jagadish1...@gmail.com> Closes #46 from prateekm/rocksdb-upgrade Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1c7e4d7a Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1c7e4d7a Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1c7e4d7a Branch: refs/heads/master Commit: 1c7e4d7aaeb4b5036034c9182fa0259262bcda8e Parents: c6c10d3 Author: Prateek Maheshwari <pmahe...@linkedin.com> Authored: Mon Jan 30 14:58:50 2017 -0800 Committer: vjagadish1989 <jvenk...@linkedin.com> Committed: Mon Jan 30 14:58:50 2017 -0800 ---------------------------------------------------------------------- .../versioned/jobs/configuration-table.html | 15 ++++ gradle/dependency-versions.gradle | 2 +- .../samza/storage/kv/RocksDbOptionsHelper.java | 83 +++++++++++--------- .../samza/storage/kv/RocksDbKeyValueStore.scala | 10 +-- 4 files changed, 69 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/1c7e4d7a/docs/learn/documentation/versioned/jobs/configuration-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 7bac935..a26bc43 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -1422,6 +1422,21 @@ </td> </tr> + <tr> + <td class="property" id="stores-rocksdb-log-file-size">stores.<span class="store">store-name</span>.<br>rocksdb.max.log.file.size.bytes</td> + <td class="default">67108864</td> + <td class="description"> + The maximum size in bytes of the RocksDB LOG file before it is rotated. + </td> + </tr> + + <tr> + <td class="property" id="stores-rocksdb-num-log-files">stores.<span class="store">store-name</span>.<br>rocksdb.keep.log.file.num</td> + <td class="default">2</td> + <td class="description"> + The number of RocksDB LOG files (including rotated LOG.old.* files) to keep. + </td> + </tr> <tr> <th colspan="3" class="section" id="cluster-manager"> http://git-wip-us.apache.org/repos/asf/samza/blob/1c7e4d7a/gradle/dependency-versions.gradle ---------------------------------------------------------------------- diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle index 976a49c..db59672 100644 --- a/gradle/dependency-versions.gradle +++ b/gradle/dependency-versions.gradle @@ -30,7 +30,7 @@ metricsVersion = "2.2.0" kafkaVersion = "0.10.0.1" commonsHttpClientVersion = "3.1" - rocksdbVersion = "3.13.1" + rocksdbVersion = "5.0.1" yarnVersion = "2.6.1" slf4jVersion = "1.6.2" log4jVersion = "1.2.17" http://git-wip-us.apache.org/repos/asf/samza/blob/1c7e4d7a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java index d4f765c..9b8f44b 100644 --- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java +++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java @@ -34,6 +34,13 @@ import org.slf4j.LoggerFactory; public class RocksDbOptionsHelper { private static final Logger log = LoggerFactory.getLogger(RocksDbOptionsHelper.class); + private static final String ROCKSDB_COMPRESSION = "rocksdb.compression"; + private static final String ROCKSDB_BLOCK_SIZE_BYTES = "rocksdb.block.size.bytes"; + private static final String ROCKSDB_COMPACTION_STYLE = "rocksdb.compaction.style"; + private static final String ROCKSDB_NUM_WRITE_BUFFERS = "rocksdb.num.write.buffers"; + private static final String ROCKSDB_MAX_LOG_FILE_SIZE_BYTES = "rocksdb.max.log.file.size.bytes"; + private static final String ROCKSDB_KEEP_LOG_FILE_NUM = "rocksdb.keep.log.file.num"; + public static Options options(Config storeConfig, SamzaContainerContext containerContext) { Options options = new Options(); Long writeBufSize = storeConfig.getLong("container.write.buffer.size.bytes", 32 * 1024 * 1024); @@ -42,59 +49,65 @@ public class RocksDbOptionsHelper { options.setWriteBufferSize((int) (writeBufSize / numTasks)); CompressionType compressionType = CompressionType.SNAPPY_COMPRESSION; - String compressionInConfig = storeConfig.get("rocksdb.compression", "snappy"); + String compressionInConfig = storeConfig.get(ROCKSDB_COMPRESSION, "snappy"); switch (compressionInConfig) { - case "snappy": - compressionType = CompressionType.SNAPPY_COMPRESSION; - break; - case "bzip2": - compressionType = CompressionType.BZLIB2_COMPRESSION; - break; - case "zlib": - compressionType = CompressionType.ZLIB_COMPRESSION; - break; - case "lz4": - compressionType = CompressionType.LZ4_COMPRESSION; - break; - case "lz4hc": - compressionType = CompressionType.LZ4HC_COMPRESSION; - break; - case "none": - compressionType = CompressionType.NO_COMPRESSION; - break; - default: - log.warn("Unknown rocksdb.compression codec " + compressionInConfig + ", overwriting to Snappy"); + case "snappy": + compressionType = CompressionType.SNAPPY_COMPRESSION; + break; + case "bzip2": + compressionType = CompressionType.BZLIB2_COMPRESSION; + break; + case "zlib": + compressionType = CompressionType.ZLIB_COMPRESSION; + break; + case "lz4": + compressionType = CompressionType.LZ4_COMPRESSION; + break; + case "lz4hc": + compressionType = CompressionType.LZ4HC_COMPRESSION; + break; + case "none": + compressionType = CompressionType.NO_COMPRESSION; + break; + default: + log.warn("Unknown rocksdb.compression codec " + compressionInConfig + + ", overwriting to " + compressionType.name()); } options.setCompressionType(compressionType); Long cacheSize = storeConfig.getLong("container.cache.size.bytes", 100 * 1024 * 1024L); Long cacheSizePerContainer = cacheSize / numTasks; - int blockSize = storeConfig.getInt("rocksdb.block.size.bytes", 4096); + + int blockSize = storeConfig.getInt(ROCKSDB_BLOCK_SIZE_BYTES, 4096); BlockBasedTableConfig tableOptions = new BlockBasedTableConfig(); tableOptions.setBlockCacheSize(cacheSizePerContainer).setBlockSize(blockSize); options.setTableFormatConfig(tableOptions); CompactionStyle compactionStyle = CompactionStyle.UNIVERSAL; - String compactionStyleInConfig = storeConfig.get("rocksdb.compaction.style", "universal"); + String compactionStyleInConfig = storeConfig.get(ROCKSDB_COMPACTION_STYLE, "universal"); switch (compactionStyleInConfig) { - case "universal": - compactionStyle = CompactionStyle.UNIVERSAL; - break; - case "fifo": - compactionStyle = CompactionStyle.FIFO; - break; - case "level": - compactionStyle = CompactionStyle.LEVEL; - break; - default: - log.warn("Unknown rocksdb.compactionStyle " + compactionStyleInConfig + ", overwriting to universal"); + case "universal": + compactionStyle = CompactionStyle.UNIVERSAL; + break; + case "fifo": + compactionStyle = CompactionStyle.FIFO; + break; + case "level": + compactionStyle = CompactionStyle.LEVEL; + break; + default: + log.warn("Unknown rocksdb.compaction.style " + compactionStyleInConfig + + ", overwriting to " + compactionStyle.name()); } options.setCompactionStyle(compactionStyle); - options.setMaxWriteBufferNumber(storeConfig.getInt("rocksdb.num.write.buffers", 3)); + options.setMaxWriteBufferNumber(storeConfig.getInt(ROCKSDB_NUM_WRITE_BUFFERS, 3)); options.setCreateIfMissing(true); options.setErrorIfExists(false); + options.setMaxLogFileSize(storeConfig.getLong(ROCKSDB_MAX_LOG_FILE_SIZE_BYTES, 64 * 1024 * 1024L)); + options.setKeepLogFileNum(storeConfig.getLong(ROCKSDB_KEEP_LOG_FILE_NUM, 2)); + return options; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/1c7e4d7a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala index 73b89f7..5112ac6 100644 --- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala @@ -139,7 +139,7 @@ class RocksDbKeyValueStore( metrics.puts.inc require(key != null, "Null key not allowed.") if (value == null) { - db.remove(writeOptions, key) + db.delete(writeOptions, key) } else { metrics.bytesWritten.inc(key.size + value.size) db.put(writeOptions, key, value) @@ -156,7 +156,7 @@ class RocksDbKeyValueStore( val curr = iter.next() if (curr.getValue == null) { deletes += 1 - db.remove(writeOptions, curr.getKey) + db.delete(writeOptions, curr.getKey) } else { val key = curr.getKey val value = curr.getValue @@ -204,13 +204,13 @@ class RocksDbKeyValueStore( class RocksDbIterator(iter: RocksIterator) extends KeyValueIterator[Array[Byte], Array[Byte]] { private var open = true - private var firstValueAccessed = false; + private var firstValueAccessed = false def close() = { open = false - iter.dispose() + iter.close() } - def remove() = throw new UnsupportedOperationException("RocksDB iterator doesn't support remove"); + def remove() = throw new UnsupportedOperationException("RocksDB iterator doesn't support remove") def hasNext() = iter.isValid