This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-4.14
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.14 by this push:
     new a2c3aeeeed Allow to set max operation numbers in a single rocksdb 
batch (#4044)
a2c3aeeeed is described below

commit a2c3aeeeed31b4bb907a6b914ab8fd99af73e2f4
Author: Yong Zhang <[email protected]>
AuthorDate: Mon Aug 14 08:49:27 2023 +0800

    Allow to set max operation numbers in a single rocksdb batch (#4044)
    
    ---
    
    In rocksdb, the memory usage is related to the batch size.
    The more operations in a single batch, the more memory is consumed.
    Expose the configuration to allow control the batch size.
    
    (cherry picked from commit ad0ed213c0d3abf971441c7160334af99d94159c)
---
 .../bookie/storage/ldb/KeyValueStorage.java        |  4 +++
 .../bookie/storage/ldb/KeyValueStorageRocksDB.java | 28 ++++++++++++++--
 .../bookkeeper/conf/ServerConfiguration.java       | 24 ++++++++++++++
 .../bookie/storage/ldb/KeyValueStorageTest.java    | 37 ++++++++++++++++++++++
 4 files changed, 91 insertions(+), 2 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java
index 27f987c2c4..36b92cdcc8 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java
@@ -168,5 +168,9 @@ public interface KeyValueStorage extends Closeable {
         void clear();
 
         void flush() throws IOException;
+
+        default int batchCount() {
+            return -1;
+        }
     }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java
index 7a7aac8b95..97a9cb93f3 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java
@@ -69,8 +69,8 @@ public class KeyValueStorageRocksDB implements 
KeyValueStorage {
 
     private final ReadOptions optionCache;
     private final ReadOptions optionDontCache;
-
     private final WriteBatch emptyBatch;
+    private final int writeBatchMaxSize;
 
     private static final String ROCKSDB_LOG_PATH = "dbStorage_rocksDB_logPath";
     private static final String ROCKSDB_LOG_LEVEL = 
"dbStorage_rocksDB_logLevel";
@@ -206,6 +206,7 @@ public class KeyValueStorageRocksDB implements 
KeyValueStorage {
 
         optionCache.setFillCache(true);
         optionDontCache.setFillCache(false);
+        this.writeBatchMaxSize = 
conf.getMaxOperationNumbersInSingleRocksDBBatch();
     }
 
     @Override
@@ -403,21 +404,29 @@ public class KeyValueStorageRocksDB implements 
KeyValueStorage {
 
     @Override
     public Batch newBatch() {
-        return new RocksDBBatch();
+        return new RocksDBBatch(writeBatchMaxSize);
     }
 
     private class RocksDBBatch implements Batch {
         private final WriteBatch writeBatch = new WriteBatch();
+        private final int batchSize;
+        private int batchCount = 0;
+
+        RocksDBBatch(int batchSize) {
+            this.batchSize = batchSize;
+        }
 
         @Override
         public void close() {
             writeBatch.close();
+            batchCount = 0;
         }
 
         @Override
         public void put(byte[] key, byte[] value) throws IOException {
             try {
                 writeBatch.put(key, value);
+                countBatchAndFlushIfNeeded();
             } catch (RocksDBException e) {
                 throw new IOException("Failed to flush RocksDB batch", e);
             }
@@ -427,6 +436,7 @@ public class KeyValueStorageRocksDB implements 
KeyValueStorage {
         public void remove(byte[] key) throws IOException {
             try {
                 writeBatch.delete(key);
+                countBatchAndFlushIfNeeded();
             } catch (RocksDBException e) {
                 throw new IOException("Failed to flush RocksDB batch", e);
             }
@@ -435,17 +445,31 @@ public class KeyValueStorageRocksDB implements 
KeyValueStorage {
         @Override
         public void clear() {
             writeBatch.clear();
+            batchCount = 0;
         }
 
         @Override
         public void deleteRange(byte[] beginKey, byte[] endKey) throws 
IOException {
             try {
                 writeBatch.deleteRange(beginKey, endKey);
+                countBatchAndFlushIfNeeded();
             } catch (RocksDBException e) {
                 throw new IOException("Failed to flush RocksDB batch", e);
             }
         }
 
+        private void countBatchAndFlushIfNeeded() throws IOException {
+            if (++batchCount >= batchSize) {
+                flush();
+                clear();
+            }
+        }
+
+        @Override
+        public int batchCount() {
+            return batchCount;
+        }
+
         @Override
         public void flush() throws IOException {
             try {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 427812aeec..1ea8c47e10 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -310,6 +310,9 @@ public class ServerConfiguration extends 
AbstractConfiguration<ServerConfigurati
     protected static final String AUTHORIZED_ROLES = "authorizedRoles";
     protected static final String ROCKSDB_DELETE_ENTRIES_BATCH_SIZE = 
"rocksDBDeleteEntriesBatchSize";
 
+    protected static final String 
MAX_OPERATION_NUMBERS_IN_SINGLE_ROCKSDB_WRITE_BATCH =
+        "maxOperationNumbersInSingleRocksdbWriteBatch";
+
     protected static final String SKIP_REPLAY_JOURNAL_INVALID_RECORD = 
"skipReplayJournalInvalidRecord";
 
     /**
@@ -3664,4 +3667,25 @@ public class ServerConfiguration extends 
AbstractConfiguration<ServerConfigurati
         this.setProperty(ROCKSDB_DELETE_ENTRIES_BATCH_SIZE, 
rocksDBDeleteEntriesBatchSize);
         return this;
     }
+
+    /**
+     * Set the max operation numbers in a single rocksdb write batch.
+     * The rocksdb write batch is related to the memory usage. If the batch is 
too large, it will cause the OOM.
+     *
+     * @param maxNumbersInSingleRocksDBBatch
+     * @return
+     */
+    public ServerConfiguration 
setOperationMaxNumbersInSingleRocksDBWriteBatch(int 
maxNumbersInSingleRocksDBBatch) {
+        this.setProperty(MAX_OPERATION_NUMBERS_IN_SINGLE_ROCKSDB_WRITE_BATCH, 
maxNumbersInSingleRocksDBBatch);
+        return this;
+    }
+
+    /**
+     * Get the max operation numbers in a single rocksdb write batch.
+     *
+     * @return
+     */
+    public int getMaxOperationNumbersInSingleRocksDBBatch() {
+        return getInt(MAX_OPERATION_NUMBERS_IN_SINGLE_ROCKSDB_WRITE_BATCH, 
100000);
+    }
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java
index 1c8ae855e6..dcf7d281e8 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java
@@ -172,4 +172,41 @@ public class KeyValueStorageTest {
         db.close();
         FileUtils.deleteDirectory(tmpDir);
     }
+
+    @Test
+    public void testBatch() throws Exception {
+
+        configuration.setOperationMaxNumbersInSingleRocksDBWriteBatch(5);
+
+        File tmpDir = 
Files.createTempDirectory("junitTemporaryFolder").toFile();
+        Files.createDirectory(Paths.get(tmpDir.toString(), "subDir"));
+
+        KeyValueStorage db = 
storageFactory.newKeyValueStorage(tmpDir.toString(), "subDir", 
DbConfigType.Huge,
+                configuration);
+
+        assertEquals(null, db.getFloor(toArray(3)));
+        assertEquals(0, db.count());
+
+        Batch batch = db.newBatch();
+        assertEquals(0, batch.batchCount());
+
+        batch.put(toArray(1), toArray(1));
+        batch.put(toArray(2), toArray(2));
+        assertEquals(2, batch.batchCount());
+
+        batch.put(toArray(3), toArray(3));
+        batch.put(toArray(4), toArray(4));
+        batch.put(toArray(5), toArray(5));
+        assertEquals(0, batch.batchCount());
+        batch.put(toArray(6), toArray(6));
+        assertEquals(1, batch.batchCount());
+
+        batch.flush();
+        assertEquals(1, batch.batchCount());
+        batch.close();
+        assertEquals(0, batch.batchCount());
+
+        db.close();
+        FileUtils.deleteDirectory(tmpDir);
+    }
 }

Reply via email to