This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-4.15 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit eff2670f7e9427067a478672cc2c69d1020bdd9d Author: Yong Zhang <[email protected]> AuthorDate: Mon Aug 14 08:49:27 2023 +0800 Allow to set max operation numbers in a single rocksdb batch (#4044) --- ## Motivation In rocksdb, the memory usage is related to the batch size. The more operations in a single batch, the more memory is consumed. Expose the configuration to allow control the batch size. (cherry picked from commit ad0ed213c0d3abf971441c7160334af99d94159c) --- .../bookie/storage/ldb/KeyValueStorage.java | 4 +++ .../bookie/storage/ldb/KeyValueStorageRocksDB.java | 28 +++++++++++++++- .../bookkeeper/conf/ServerConfiguration.java | 24 ++++++++++++++ .../bookie/storage/ldb/KeyValueStorageTest.java | 37 ++++++++++++++++++++++ 4 files changed, 92 insertions(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java index 27f987c2c4..36b92cdcc8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java @@ -168,5 +168,9 @@ public interface KeyValueStorage extends Closeable { void clear(); void flush() throws IOException; + + default int batchCount() { + return -1; + } } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java index f8ac00c8d8..361f4e80ce 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java @@ -83,6 +83,7 @@ public class KeyValueStorageRocksDB implements KeyValueStorage { private final ReadOptions optionCache; private final ReadOptions optionDontCache; private final WriteBatch emptyBatch; + private final int writeBatchMaxSize; private static final String ROCKSDB_LOG_PATH = "dbStorage_rocksDB_logPath"; private static final String ROCKSDB_LOG_LEVEL = "dbStorage_rocksDB_logLevel"; @@ -139,6 +140,8 @@ public class KeyValueStorageRocksDB implements KeyValueStorage { optionCache.setFillCache(true); optionDontCache.setFillCache(false); + + this.writeBatchMaxSize = conf.getMaxOperationNumbersInSingleRocksDBBatch(); } private RocksDB initializeRocksDBWithConfFile(String basePath, String subPath, DbConfigType dbConfigType, @@ -473,21 +476,29 @@ public class KeyValueStorageRocksDB implements KeyValueStorage { @Override public Batch newBatch() { - return new RocksDBBatch(); + return new RocksDBBatch(writeBatchMaxSize); } private class RocksDBBatch implements Batch { private final WriteBatch writeBatch = new WriteBatch(); + private final int batchSize; + private int batchCount = 0; + + RocksDBBatch(int batchSize) { + this.batchSize = batchSize; + } @Override public void close() { writeBatch.close(); + batchCount = 0; } @Override public void put(byte[] key, byte[] value) throws IOException { try { writeBatch.put(key, value); + countBatchAndFlushIfNeeded(); } catch (RocksDBException e) { throw new IOException("Failed to flush RocksDB batch", e); } @@ -497,6 +508,7 @@ public class KeyValueStorageRocksDB implements KeyValueStorage { public void remove(byte[] key) throws IOException { try { writeBatch.delete(key); + countBatchAndFlushIfNeeded(); } catch (RocksDBException e) { throw new IOException("Failed to flush RocksDB batch", e); } @@ -505,17 +517,31 @@ public class KeyValueStorageRocksDB implements KeyValueStorage { @Override public void clear() { writeBatch.clear(); + batchCount = 0; } @Override public void deleteRange(byte[] beginKey, byte[] endKey) throws IOException { try { writeBatch.deleteRange(beginKey, endKey); + countBatchAndFlushIfNeeded(); } catch (RocksDBException e) { throw new IOException("Failed to flush RocksDB batch", e); } } + private void countBatchAndFlushIfNeeded() throws IOException { + if (++batchCount >= batchSize) { + flush(); + clear(); + } + } + + @Override + public int batchCount() { + return batchCount; + } + @Override public void flush() throws IOException { try { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index 606423b49b..aab3fd5976 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -336,6 +336,9 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati // Used for location index, lots of writes and much bigger dataset protected static final String LEDGER_METADATA_ROCKSDB_CONF = "ledgerMetadataRocksdbConf"; + protected static final String MAX_OPERATION_NUMBERS_IN_SINGLE_ROCKSDB_WRITE_BATCH = + "maxOperationNumbersInSingleRocksdbWriteBatch"; + protected static final String SKIP_REPLAY_JOURNAL_INVALID_RECORD = "skipReplayJournalInvalidRecord"; /** @@ -4027,4 +4030,25 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati setProperty(REPLICATION_RATE_BY_BYTES, rate); return this; } + + /** + * Set the max operation numbers in a single rocksdb write batch. + * The rocksdb write batch is related to the memory usage. If the batch is too large, it will cause the OOM. + * + * @param maxNumbersInSingleRocksDBBatch + * @return + */ + public ServerConfiguration setOperationMaxNumbersInSingleRocksDBWriteBatch(int maxNumbersInSingleRocksDBBatch) { + this.setProperty(MAX_OPERATION_NUMBERS_IN_SINGLE_ROCKSDB_WRITE_BATCH, maxNumbersInSingleRocksDBBatch); + return this; + } + + /** + * Get the max operation numbers in a single rocksdb write batch. + * + * @return + */ + public int getMaxOperationNumbersInSingleRocksDBBatch() { + return getInt(MAX_OPERATION_NUMBERS_IN_SINGLE_ROCKSDB_WRITE_BATCH, 100000); + } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java index ed1e30825b..181c666e7b 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java @@ -172,4 +172,41 @@ public class KeyValueStorageTest { db.close(); FileUtils.deleteDirectory(tmpDir); } + + @Test + public void testBatch() throws Exception { + + configuration.setOperationMaxNumbersInSingleRocksDBWriteBatch(5); + + File tmpDir = Files.createTempDirectory("junitTemporaryFolder").toFile(); + Files.createDirectory(Paths.get(tmpDir.toString(), "subDir")); + + KeyValueStorage db = storageFactory.newKeyValueStorage(tmpDir.toString(), "subDir", DbConfigType.Default, + configuration); + + assertEquals(null, db.getFloor(toArray(3))); + assertEquals(0, db.count()); + + Batch batch = db.newBatch(); + assertEquals(0, batch.batchCount()); + + batch.put(toArray(1), toArray(1)); + batch.put(toArray(2), toArray(2)); + assertEquals(2, batch.batchCount()); + + batch.put(toArray(3), toArray(3)); + batch.put(toArray(4), toArray(4)); + batch.put(toArray(5), toArray(5)); + assertEquals(0, batch.batchCount()); + batch.put(toArray(6), toArray(6)); + assertEquals(1, batch.batchCount()); + + batch.flush(); + assertEquals(1, batch.batchCount()); + batch.close(); + assertEquals(0, batch.batchCount()); + + db.close(); + FileUtils.deleteDirectory(tmpDir); + } }
