This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch branch-4.14
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/branch-4.14 by this push:
new a2c3aeeeed Allow to set max operation numbers in a single rocksdb
batch (#4044)
a2c3aeeeed is described below
commit a2c3aeeeed31b4bb907a6b914ab8fd99af73e2f4
Author: Yong Zhang <[email protected]>
AuthorDate: Mon Aug 14 08:49:27 2023 +0800
Allow to set max operation numbers in a single rocksdb batch (#4044)
---
In rocksdb, the memory usage is related to the batch size.
The more operations in a single batch, the more memory is consumed.
Expose the configuration to allow control the batch size.
(cherry picked from commit ad0ed213c0d3abf971441c7160334af99d94159c)
---
.../bookie/storage/ldb/KeyValueStorage.java | 4 +++
.../bookie/storage/ldb/KeyValueStorageRocksDB.java | 28 ++++++++++++++--
.../bookkeeper/conf/ServerConfiguration.java | 24 ++++++++++++++
.../bookie/storage/ldb/KeyValueStorageTest.java | 37 ++++++++++++++++++++++
4 files changed, 91 insertions(+), 2 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java
index 27f987c2c4..36b92cdcc8 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java
@@ -168,5 +168,9 @@ public interface KeyValueStorage extends Closeable {
void clear();
void flush() throws IOException;
+
+ default int batchCount() {
+ return -1;
+ }
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java
index 7a7aac8b95..97a9cb93f3 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java
@@ -69,8 +69,8 @@ public class KeyValueStorageRocksDB implements
KeyValueStorage {
private final ReadOptions optionCache;
private final ReadOptions optionDontCache;
-
private final WriteBatch emptyBatch;
+ private final int writeBatchMaxSize;
private static final String ROCKSDB_LOG_PATH = "dbStorage_rocksDB_logPath";
private static final String ROCKSDB_LOG_LEVEL =
"dbStorage_rocksDB_logLevel";
@@ -206,6 +206,7 @@ public class KeyValueStorageRocksDB implements
KeyValueStorage {
optionCache.setFillCache(true);
optionDontCache.setFillCache(false);
+ this.writeBatchMaxSize =
conf.getMaxOperationNumbersInSingleRocksDBBatch();
}
@Override
@@ -403,21 +404,29 @@ public class KeyValueStorageRocksDB implements
KeyValueStorage {
@Override
public Batch newBatch() {
- return new RocksDBBatch();
+ return new RocksDBBatch(writeBatchMaxSize);
}
private class RocksDBBatch implements Batch {
private final WriteBatch writeBatch = new WriteBatch();
+ private final int batchSize;
+ private int batchCount = 0;
+
+ RocksDBBatch(int batchSize) {
+ this.batchSize = batchSize;
+ }
@Override
public void close() {
writeBatch.close();
+ batchCount = 0;
}
@Override
public void put(byte[] key, byte[] value) throws IOException {
try {
writeBatch.put(key, value);
+ countBatchAndFlushIfNeeded();
} catch (RocksDBException e) {
throw new IOException("Failed to flush RocksDB batch", e);
}
@@ -427,6 +436,7 @@ public class KeyValueStorageRocksDB implements
KeyValueStorage {
public void remove(byte[] key) throws IOException {
try {
writeBatch.delete(key);
+ countBatchAndFlushIfNeeded();
} catch (RocksDBException e) {
throw new IOException("Failed to flush RocksDB batch", e);
}
@@ -435,17 +445,31 @@ public class KeyValueStorageRocksDB implements
KeyValueStorage {
@Override
public void clear() {
writeBatch.clear();
+ batchCount = 0;
}
@Override
public void deleteRange(byte[] beginKey, byte[] endKey) throws
IOException {
try {
writeBatch.deleteRange(beginKey, endKey);
+ countBatchAndFlushIfNeeded();
} catch (RocksDBException e) {
throw new IOException("Failed to flush RocksDB batch", e);
}
}
+ private void countBatchAndFlushIfNeeded() throws IOException {
+ if (++batchCount >= batchSize) {
+ flush();
+ clear();
+ }
+ }
+
+ @Override
+ public int batchCount() {
+ return batchCount;
+ }
+
@Override
public void flush() throws IOException {
try {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 427812aeec..1ea8c47e10 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -310,6 +310,9 @@ public class ServerConfiguration extends
AbstractConfiguration<ServerConfigurati
protected static final String AUTHORIZED_ROLES = "authorizedRoles";
protected static final String ROCKSDB_DELETE_ENTRIES_BATCH_SIZE =
"rocksDBDeleteEntriesBatchSize";
+ protected static final String
MAX_OPERATION_NUMBERS_IN_SINGLE_ROCKSDB_WRITE_BATCH =
+ "maxOperationNumbersInSingleRocksdbWriteBatch";
+
protected static final String SKIP_REPLAY_JOURNAL_INVALID_RECORD =
"skipReplayJournalInvalidRecord";
/**
@@ -3664,4 +3667,25 @@ public class ServerConfiguration extends
AbstractConfiguration<ServerConfigurati
this.setProperty(ROCKSDB_DELETE_ENTRIES_BATCH_SIZE,
rocksDBDeleteEntriesBatchSize);
return this;
}
+
+ /**
+ * Set the max operation numbers in a single rocksdb write batch.
+ * The rocksdb write batch is related to the memory usage. If the batch is
too large, it will cause the OOM.
+ *
+ * @param maxNumbersInSingleRocksDBBatch
+ * @return
+ */
+ public ServerConfiguration
setOperationMaxNumbersInSingleRocksDBWriteBatch(int
maxNumbersInSingleRocksDBBatch) {
+ this.setProperty(MAX_OPERATION_NUMBERS_IN_SINGLE_ROCKSDB_WRITE_BATCH,
maxNumbersInSingleRocksDBBatch);
+ return this;
+ }
+
+ /**
+ * Get the max operation numbers in a single rocksdb write batch.
+ *
+ * @return
+ */
+ public int getMaxOperationNumbersInSingleRocksDBBatch() {
+ return getInt(MAX_OPERATION_NUMBERS_IN_SINGLE_ROCKSDB_WRITE_BATCH,
100000);
+ }
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java
index 1c8ae855e6..dcf7d281e8 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java
@@ -172,4 +172,41 @@ public class KeyValueStorageTest {
db.close();
FileUtils.deleteDirectory(tmpDir);
}
+
+ @Test
+ public void testBatch() throws Exception {
+
+ configuration.setOperationMaxNumbersInSingleRocksDBWriteBatch(5);
+
+ File tmpDir =
Files.createTempDirectory("junitTemporaryFolder").toFile();
+ Files.createDirectory(Paths.get(tmpDir.toString(), "subDir"));
+
+ KeyValueStorage db =
storageFactory.newKeyValueStorage(tmpDir.toString(), "subDir",
DbConfigType.Huge,
+ configuration);
+
+ assertEquals(null, db.getFloor(toArray(3)));
+ assertEquals(0, db.count());
+
+ Batch batch = db.newBatch();
+ assertEquals(0, batch.batchCount());
+
+ batch.put(toArray(1), toArray(1));
+ batch.put(toArray(2), toArray(2));
+ assertEquals(2, batch.batchCount());
+
+ batch.put(toArray(3), toArray(3));
+ batch.put(toArray(4), toArray(4));
+ batch.put(toArray(5), toArray(5));
+ assertEquals(0, batch.batchCount());
+ batch.put(toArray(6), toArray(6));
+ assertEquals(1, batch.batchCount());
+
+ batch.flush();
+ assertEquals(1, batch.batchCount());
+ batch.close();
+ assertEquals(0, batch.batchCount());
+
+ db.close();
+ FileUtils.deleteDirectory(tmpDir);
+ }
}