This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-4.16 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit e531fd1806ce37cdfbdcd56b29c4c8d46b5fd2ca Author: Yong Zhang <[email protected]> AuthorDate: Mon Aug 14 08:49:27 2023 +0800 Allow to set max operation numbers in a single rocksdb batch (#4044) --- ## Motivation In rocksdb, the memory usage is related to the batch size. The more operations in a single batch, the more memory is consumed. Expose the configuration to allow control the batch size. (cherry picked from commit ad0ed213c0d3abf971441c7160334af99d94159c) --- .../bookie/storage/ldb/KeyValueStorage.java | 4 +++ .../bookie/storage/ldb/KeyValueStorageRocksDB.java | 28 +++++++++++++++- .../bookkeeper/conf/ServerConfiguration.java | 24 ++++++++++++++ .../bookie/storage/ldb/KeyValueStorageTest.java | 37 ++++++++++++++++++++++ 4 files changed, 92 insertions(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java index ab724d73bb..8e18148c08 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java @@ -178,5 +178,9 @@ public interface KeyValueStorage extends Closeable { void clear(); void flush() throws IOException; + + default int batchCount() { + return -1; + } } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java index fd87050693..a77a0a18f7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java @@ -84,6 +84,7 @@ public class KeyValueStorageRocksDB implements KeyValueStorage { private final ReadOptions optionCache; private final ReadOptions optionDontCache; private final WriteBatch emptyBatch; + private final int writeBatchMaxSize; private String dbPath; @@ -143,6 +144,8 @@ public class KeyValueStorageRocksDB implements KeyValueStorage { optionCache.setFillCache(true); optionDontCache.setFillCache(false); + + this.writeBatchMaxSize = conf.getMaxOperationNumbersInSingleRocksDBBatch(); } private RocksDB initializeRocksDBWithConfFile(String basePath, String subPath, DbConfigType dbConfigType, @@ -516,21 +519,29 @@ public class KeyValueStorageRocksDB implements KeyValueStorage { @Override public Batch newBatch() { - return new RocksDBBatch(); + return new RocksDBBatch(writeBatchMaxSize); } private class RocksDBBatch implements Batch { private final WriteBatch writeBatch = new WriteBatch(); + private final int batchSize; + private int batchCount = 0; + + RocksDBBatch(int batchSize) { + this.batchSize = batchSize; + } @Override public void close() { writeBatch.close(); + batchCount = 0; } @Override public void put(byte[] key, byte[] value) throws IOException { try { writeBatch.put(key, value); + countBatchAndFlushIfNeeded(); } catch (RocksDBException e) { throw new IOException("Failed to flush RocksDB batch", e); } @@ -540,6 +551,7 @@ public class KeyValueStorageRocksDB implements KeyValueStorage { public void remove(byte[] key) throws IOException { try { writeBatch.delete(key); + countBatchAndFlushIfNeeded(); } catch (RocksDBException e) { throw new IOException("Failed to flush RocksDB batch", e); } @@ -548,17 +560,31 @@ public class KeyValueStorageRocksDB implements KeyValueStorage { @Override public void clear() { writeBatch.clear(); + batchCount = 0; } @Override public void deleteRange(byte[] beginKey, byte[] endKey) throws IOException { try { writeBatch.deleteRange(beginKey, endKey); + countBatchAndFlushIfNeeded(); } catch (RocksDBException e) { throw new IOException("Failed to flush RocksDB batch", e); } } + private void countBatchAndFlushIfNeeded() throws IOException { + if (++batchCount >= batchSize) { + flush(); + clear(); + } + } + + @Override + public int batchCount() { + return batchCount; + } + @Override public void flush() throws IOException { try { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index 2849d3fd66..019144e994 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -338,6 +338,9 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati // Used for location index, lots of writes and much bigger dataset protected static final String LEDGER_METADATA_ROCKSDB_CONF = "ledgerMetadataRocksdbConf"; + protected static final String MAX_OPERATION_NUMBERS_IN_SINGLE_ROCKSDB_WRITE_BATCH = + "maxOperationNumbersInSingleRocksdbWriteBatch"; + protected static final String SKIP_REPLAY_JOURNAL_INVALID_RECORD = "skipReplayJournalInvalidRecord"; /** @@ -4079,4 +4082,25 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati this.setProperty(LEDGER_METADATA_ROCKSDB_CONF, ledgerMetadataRocksdbConf); return this; } + + /** + * Set the max operation numbers in a single rocksdb write batch. + * The rocksdb write batch is related to the memory usage. If the batch is too large, it will cause the OOM. + * + * @param maxNumbersInSingleRocksDBBatch + * @return + */ + public ServerConfiguration setOperationMaxNumbersInSingleRocksDBWriteBatch(int maxNumbersInSingleRocksDBBatch) { + this.setProperty(MAX_OPERATION_NUMBERS_IN_SINGLE_ROCKSDB_WRITE_BATCH, maxNumbersInSingleRocksDBBatch); + return this; + } + + /** + * Get the max operation numbers in a single rocksdb write batch. + * + * @return + */ + public int getMaxOperationNumbersInSingleRocksDBBatch() { + return getInt(MAX_OPERATION_NUMBERS_IN_SINGLE_ROCKSDB_WRITE_BATCH, 100000); + } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java index 458f3c69f9..d52f19305e 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java @@ -170,4 +170,41 @@ public class KeyValueStorageTest { db.close(); FileUtils.deleteDirectory(tmpDir); } + + @Test + public void testBatch() throws Exception { + + configuration.setOperationMaxNumbersInSingleRocksDBWriteBatch(5); + + File tmpDir = Files.createTempDirectory("junitTemporaryFolder").toFile(); + Files.createDirectory(Paths.get(tmpDir.toString(), "subDir")); + + KeyValueStorage db = storageFactory.newKeyValueStorage(tmpDir.toString(), "subDir", DbConfigType.Default, + configuration); + + assertEquals(null, db.getFloor(toArray(3))); + assertEquals(0, db.count()); + + Batch batch = db.newBatch(); + assertEquals(0, batch.batchCount()); + + batch.put(toArray(1), toArray(1)); + batch.put(toArray(2), toArray(2)); + assertEquals(2, batch.batchCount()); + + batch.put(toArray(3), toArray(3)); + batch.put(toArray(4), toArray(4)); + batch.put(toArray(5), toArray(5)); + assertEquals(0, batch.batchCount()); + batch.put(toArray(6), toArray(6)); + assertEquals(1, batch.batchCount()); + + batch.flush(); + assertEquals(1, batch.batchCount()); + batch.close(); + assertEquals(0, batch.batchCount()); + + db.close(); + FileUtils.deleteDirectory(tmpDir); + } }
