This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 58d2338de KUDU-3371 [fs] Use RocksDB to store LBM metadata
58d2338de is described below

commit 58d2338de8d586106ea29cfa281ee653392a9145
Author: Yingchun Lai <[email protected]>
AuthorDate: Sun Mar 12 22:41:48 2023 +0800

    KUDU-3371 [fs] Use RocksDB to store LBM metadata
    
    Since the LogBlockContainerNativeMeta stores block records
    sequentially in the metadata file, the live blocks maybe
    in a very low ratio, so it may cause serious disk space
    amplification and long bootstrap times.
    
    This patch introduces a new class LogBlockContainerRdbMeta
    which uses RocksDB to store LBM metadata, a new item will
    be Put() into RocksDB when a new block is created in LBM,
    and the item will be Delete() from RocksDB when the block
    is removed from LBM. Data in RocksDB can be maintained by
    RocksDB itself, i.e. deleted items will be GCed so it's not
    needed to rewrite the metadata as how we do in
    LogBlockContainerNativeMeta.
    
    The implementation also reuses most logic of the base class
    LogBlockContainer, the main difference with
    LogBlockContainerNativeMeta is that LogBlockContainerRdbMeta
    stores block records metadata in RocksDB rather than in a
    native file. The main implementation of interfaces from
    the base class including:
    a. Create a container
       Data file is created similar to LogBlockContainerNativeMeta,
       but the metadata part is stored in RocksDB with keys
       constructed as "<container_id>.<block_id>", and values are
       the same to the records stored in metadata file of
       LogBlockContainerNativeMeta.
    b. Open a container
       Similar to LogBlockContainerNativeMeta, and it's not needed
       to check the metadata part, because it has been checked when
       loading containers during the bootstrap phase.
    c. Destroy a container
       If the container is dead (full and no live blocks), remove
       the data file, and clean up metadata part, by deleting all
       the keys prefixed by "<container_id>".
    d. Load a container (by ProcessRecords())
       Iterate the RocksDB in the key range
       [<container_id>, <next_container_id>), because dead blocks
       have been deleted directly, thus only live block records
       will be populated, we can use them as LogBlockContainerNativeMeta.
    e. Create blocks in a container
       Put() serialized BlockRecordPB records into RocksDB, keys
       are constructed the same to the above.
    f. Remove blocks from a container
       Construct the keys same to the above, Delete() them from RocksDB
       in batch.
    
    This patch contains the following changes:
    - Adds a new block manager type named 'logr', it uses RocksDB
      to store LBM metadata. The new block manager is enabled by setting
      --block_manager=logr.
    - Related tests add new parameterized value to test the case
      of "--block_manager=logr".
    
    It's optional to use RocksDB, we can use the former LBM as
    before, we will introduce more tools to convert data between
    the two implementations in the future.
    
    The optimization is obvious as shown in JIRA KUDU-3371, it shows that
    the time spent to re-open tablet server's metadata when 99.99% of all
    the records removed reduced about 9.5 times when using
    LogBlockContainerRdbMeta instead of LogBlockContainerNativeMeta.
    
    Change-Id: Ie72f6914eb5653a9c034766c6cd3741a8340711f
    Reviewed-on: http://gerrit.cloudera.org:8080/18569
    Tested-by: Alexey Serbin <[email protected]>
    Reviewed-by: Alexey Serbin <[email protected]>
---
 src/kudu/benchmarks/CMakeLists.txt              |   8 +-
 src/kudu/client/CMakeLists.txt                  |   3 +-
 src/kudu/consensus/CMakeLists.txt               |   1 +
 src/kudu/fs/CMakeLists.txt                      |   5 +-
 src/kudu/fs/block_manager-stress-test.cc        |  15 +-
 src/kudu/fs/block_manager-test.cc               | 126 ++++--
 src/kudu/fs/block_manager.h                     |   2 +-
 src/kudu/fs/data_dirs.cc                        |   7 +-
 src/kudu/fs/dir_manager.cc                      | 141 ++++++
 src/kudu/fs/dir_manager.h                       |  59 ++-
 src/kudu/fs/dir_util.cc                         |   4 +-
 src/kudu/fs/file_block_manager.h                |   2 +
 src/kudu/fs/fs_manager-test.cc                  |  39 +-
 src/kudu/fs/fs_manager.cc                       |  16 +-
 src/kudu/fs/fs_manager.h                        |   3 +
 src/kudu/fs/fs_report.cc                        |  31 ++
 src/kudu/fs/fs_report.h                         |  21 +
 src/kudu/fs/log_block_manager-test-util.cc      | 230 ++++++++--
 src/kudu/fs/log_block_manager-test-util.h       |  32 +-
 src/kudu/fs/log_block_manager-test.cc           | 499 +++++++++++++++++++---
 src/kudu/fs/log_block_manager.cc                | 542 +++++++++++++++++++++++-
 src/kudu/fs/log_block_manager.h                 |  94 +++-
 src/kudu/integration-tests/CMakeLists.txt       |   2 +-
 src/kudu/integration-tests/dense_node-itest.cc  |  19 +-
 src/kudu/integration-tests/ts_recovery-itest.cc |  18 +-
 src/kudu/server/CMakeLists.txt                  |   4 +-
 src/kudu/tablet/compaction-test.cc              |   2 +-
 src/kudu/tools/CMakeLists.txt                   |   1 +
 src/kudu/tools/kudu-tool-test.cc                |  15 +
 src/kudu/tserver/tablet_server-test.cc          |  13 +-
 src/kudu/util/CMakeLists.txt                    |   7 +-
 thirdparty/build-definitions.sh                 |   6 +-
 32 files changed, 1782 insertions(+), 185 deletions(-)

diff --git a/src/kudu/benchmarks/CMakeLists.txt 
b/src/kudu/benchmarks/CMakeLists.txt
index 6a17d4727..73b54462c 100644
--- a/src/kudu/benchmarks/CMakeLists.txt
+++ b/src/kudu/benchmarks/CMakeLists.txt
@@ -33,13 +33,15 @@ target_link_libraries(tpch
 add_executable(tpch1 tpch/tpch1.cc)
 target_link_libraries(tpch1
   ${KUDU_MIN_TEST_LIBS}
-  tpch)
+  tpch
+  rocksdb)
 
 # tpch_real_world
 add_executable(tpch_real_world tpch/tpch_real_world.cc)
 target_link_libraries(tpch_real_world
   ${KUDU_MIN_TEST_LIBS}
-  tpch)
+  tpch
+  rocksdb)
 
 # rle
 add_executable(rle rle.cc)
@@ -59,5 +61,5 @@ endif()
 # Unit tests
 #######################################
 
-SET_KUDU_TEST_LINK_LIBS(tpch)
+SET_KUDU_TEST_LINK_LIBS(tpch rocksdb)
 ADD_KUDU_TEST(tpch/rpc_line_item_dao-test)
diff --git a/src/kudu/client/CMakeLists.txt b/src/kudu/client/CMakeLists.txt
index cecaf6546..bb5479e13 100644
--- a/src/kudu/client/CMakeLists.txt
+++ b/src/kudu/client/CMakeLists.txt
@@ -280,7 +280,8 @@ endif()
 SET_KUDU_TEST_LINK_LIBS(
   itest_util
   kudu_client
-  mini_cluster)
+  mini_cluster
+  rocksdb)
 ADD_KUDU_TEST(client-test NUM_SHARDS 8 PROCESSORS 2
                           DATA_FILES ../scripts/first_argument.sh)
 ADD_KUDU_TEST(client-unittest)
diff --git a/src/kudu/consensus/CMakeLists.txt 
b/src/kudu/consensus/CMakeLists.txt
index 353c7468a..3fc36c388 100644
--- a/src/kudu/consensus/CMakeLists.txt
+++ b/src/kudu/consensus/CMakeLists.txt
@@ -126,6 +126,7 @@ SET_KUDU_TEST_LINK_LIBS(
   consensus
   tserver_proto
   cfile
+  rocksdb
   tablet
   kudu_util)
 
diff --git a/src/kudu/fs/CMakeLists.txt b/src/kudu/fs/CMakeLists.txt
index 5482f3c42..ef7d56b96 100644
--- a/src/kudu/fs/CMakeLists.txt
+++ b/src/kudu/fs/CMakeLists.txt
@@ -43,7 +43,8 @@ target_link_libraries(kudu_fs
   fs_proto
   kudu_util
   gutil
-  ranger_kms_client)
+  ranger_kms_client
+  rocksdb)
 
 if(NOT NO_TESTS)
   add_library(kudu_fs_test_util
@@ -60,7 +61,7 @@ endif()
 # Unit tests
 #######################################
 
-SET_KUDU_TEST_LINK_LIBS(kudu_fs kudu_fs_test_util)
+SET_KUDU_TEST_LINK_LIBS(kudu_fs kudu_fs_test_util rocksdb lz4)
 ADD_KUDU_TEST(block_manager-test)
 ADD_KUDU_TEST(block_manager-stress-test RUN_SERIAL true)
 ADD_KUDU_TEST(data_dirs-test)
diff --git a/src/kudu/fs/block_manager-stress-test.cc 
b/src/kudu/fs/block_manager-stress-test.cc
index 659234887..14482e8fb 100644
--- a/src/kudu/fs/block_manager-stress-test.cc
+++ b/src/kudu/fs/block_manager-stress-test.cc
@@ -65,6 +65,7 @@
 DECLARE_bool(cache_force_single_shard);
 DECLARE_double(log_container_excess_space_before_cleanup_fraction);
 DECLARE_double(log_container_live_metadata_before_compact_ratio);
+DECLARE_string(block_manager);
 DECLARE_uint64(log_container_max_size);
 DECLARE_uint64(log_container_preallocate_bytes);
 
@@ -132,6 +133,7 @@ class BlockManagerStressTest : public KuduTest {
       total_blocks_read_(0),
       total_bytes_read_(0),
       total_blocks_deleted_(0) {
+    FLAGS_block_manager = T::name();
 
     // Increase the number of containers created.
     FLAGS_log_container_max_size = 1 * 1024 * 1024;
@@ -482,8 +484,8 @@ int 
BlockManagerStressTest<FileBlockManager>::GetMaxFdCount() const {
       FLAGS_num_reader_threads;
 }
 
-template <>
-int BlockManagerStressTest<LogBlockManagerNativeMeta>::GetMaxFdCount() const {
+template <typename T>
+int BlockManagerStressTest<T>::GetMaxFdCount() const {
   return FLAGS_max_open_files +
       // If all containers are full, each open block could theoretically
       // result in a new container, which is two files briefly outside the
@@ -496,9 +498,9 @@ void 
BlockManagerStressTest<FileBlockManager>::InjectNonFatalInconsistencies() {
   // Do nothing; the FBM has no repairable inconsistencies.
 }
 
-template <>
-void 
BlockManagerStressTest<LogBlockManagerNativeMeta>::InjectNonFatalInconsistencies()
 {
-  auto corruptor = LBMCorruptor::Create(env_, dd_manager_->GetDirs(), 
rand_seed_);
+template <typename T>
+void BlockManagerStressTest<T>::InjectNonFatalInconsistencies() {
+  auto corruptor = LBMCorruptor::Create(env_, dd_manager_.get(), rand_seed_);
   ASSERT_OK(corruptor->Init());
 
   for (int i = 0; i < FLAGS_num_inconsistencies; i++) {
@@ -508,7 +510,8 @@ void 
BlockManagerStressTest<LogBlockManagerNativeMeta>::InjectNonFatalInconsiste
 
 // What kinds of BlockManagers are supported?
 #if defined(__linux__)
-typedef ::testing::Types<FileBlockManager, LogBlockManagerNativeMeta> 
BlockManagers;
+typedef ::testing::Types<FileBlockManager, LogBlockManagerNativeMeta, 
LogBlockManagerRdbMeta>
+    BlockManagers;
 #else
 typedef ::testing::Types<FileBlockManager> BlockManagers;
 #endif
diff --git a/src/kudu/fs/block_manager-test.cc 
b/src/kudu/fs/block_manager-test.cc
index 724d3d927..3cc55fb4c 100644
--- a/src/kudu/fs/block_manager-test.cc
+++ b/src/kudu/fs/block_manager-test.cc
@@ -41,6 +41,7 @@
 #include "kudu/fs/error_manager.h"
 #include "kudu/fs/file_block_manager.h"
 #include "kudu/fs/fs.pb.h"
+#include "kudu/fs/fs_manager.h"
 #include "kudu/fs/fs_report.h"
 #include "kudu/fs/log_block_manager.h"
 #include "kudu/gutil/basictypes.h"
@@ -88,8 +89,10 @@ 
METRIC_DECLARE_gauge_uint64(block_manager_blocks_open_reading);
 METRIC_DECLARE_gauge_uint64(block_manager_blocks_open_writing);
 METRIC_DECLARE_counter(block_manager_total_writable_blocks);
 METRIC_DECLARE_counter(block_manager_total_readable_blocks);
+METRIC_DECLARE_counter(block_manager_total_blocks_deleted);
 METRIC_DECLARE_counter(block_manager_total_bytes_written);
 METRIC_DECLARE_counter(block_manager_total_bytes_read);
+METRIC_DECLARE_counter(log_block_manager_holes_punched);
 
 // Data directory metrics.
 METRIC_DECLARE_gauge_uint64(data_dirs_full);
@@ -97,7 +100,7 @@ METRIC_DECLARE_gauge_uint64(data_dirs_full);
 // The LogBlockManager is only supported on Linux, since it requires hole 
punching.
 #define RETURN_NOT_LOG_BLOCK_MANAGER() \
   do { \
-    if (FLAGS_block_manager != "log") { \
+    if (FLAGS_block_manager != "log" && FLAGS_block_manager != "logr") { \
       GTEST_SKIP() << "This platform does not use the log block manager by 
default. " \
                       "Skipping test."; \
     } \
@@ -109,13 +112,7 @@ namespace fs {
 static const char* kTestData = "test data";
 
 template<typename T>
-string block_manager_type();
-
-template<>
-string block_manager_type<FileBlockManager>() { return "file"; }
-
-template<>
-string block_manager_type<LogBlockManagerNativeMeta>() { return "log"; }
+string block_manager_type() { return T::name(); }
 
 template <typename T>
 class BlockManagerTest : public KuduTest {
@@ -123,9 +120,9 @@ class BlockManagerTest : public KuduTest {
   BlockManagerTest() :
       test_tablet_name_("test_tablet"),
       test_block_opts_(CreateBlockOptions({ test_tablet_name_ })),
-      file_cache_("test_cache", env_, 1, scoped_refptr<MetricEntity>()),
-      bm_(CreateBlockManager(scoped_refptr<MetricEntity>(),
-                             shared_ptr<MemTracker>())) {
+      file_cache_("test_cache", env_, 1, scoped_refptr<MetricEntity>()) {
+    FLAGS_block_manager = T::name();
+    bm_.reset(CreateBlockManager(scoped_refptr<MetricEntity>(), 
shared_ptr<MemTracker>()));
     CHECK_OK(file_cache_.Init());
   }
 
@@ -183,7 +180,7 @@ class BlockManagerTest : public KuduTest {
     opts.metric_entity = metric_entity;
     opts.parent_mem_tracker = parent_mem_tracker;
     error_manager_ = new FsErrorManager();
-    return new T(env_, this->dd_manager_.get(), error_manager_,
+    return new T(env_, dd_manager_, error_manager_,
                  &file_cache_, std::move(opts), fs::kDefaultTenantName);
   }
 
@@ -223,11 +220,16 @@ class BlockManagerTest : public KuduTest {
   void RunBlockDistributionTest(const vector<string>& paths);
 
   static Status CountFilesCb(int* num_files, Env::FileType type,
-                      const string& /*dirname*/,
-                      const string& basename) {
+                             const string& dirname,
+                             const string& basename) {
     if (basename == kInstanceMetadataFileName) {
       return Status::OK();
     }
+    // Ignore the 'rdb' directory which contains the RocksDB related files.
+    string parent_dir = *SplitPath(dirname).rbegin();
+    if (parent_dir == "rdb") {
+      return Status::OK();
+    }
     if (type == Env::FILE_TYPE) {
       *num_files += 1;
     }
@@ -235,7 +237,7 @@ class BlockManagerTest : public KuduTest {
   }
 
   // Utility function that counts the number of files within a directory
-  // hierarchy, ignoring '.', '..', and file 'kInstanceMetadataFileName'.
+  // hierarchy, ignoring '.', '..', 'rdb', and file 
'kInstanceMetadataFileName'.
   Status CountFiles(const string& root, int* num_files) {
     *num_files = 0;
     return env_->Walk(
@@ -303,9 +305,8 @@ void 
BlockManagerTest<FileBlockManager>::RunBlockDistributionTest(const vector<s
   }
 }
 
-template <>
-void BlockManagerTest<LogBlockManagerNativeMeta>::RunBlockDistributionTest(
-    const vector<string>& paths) {
+template<typename T>
+void BlockManagerTest<T>::RunBlockDistributionTest(const vector<string>& 
paths) {
   vector<int> files_in_each_path(paths.size());
   int num_blocks_per_dir = 30;
   // Spread across 1, then 3, then 5 data directories.
@@ -383,8 +384,8 @@ void 
BlockManagerTest<FileBlockManager>::RunMultipathTest(const vector<string>&
   ASSERT_EQ(20, num_blocks);
 }
 
-template <>
-void BlockManagerTest<LogBlockManagerNativeMeta>::RunMultipathTest(const 
vector<string>& paths) {
+template<typename T>
+void BlockManagerTest<T>::RunMultipathTest(const vector<string>& paths) {
   // Write (3 * numPaths * 2) blocks, in groups of (numPaths * 2). That should
   // yield two containers per path.
   CreateBlockOptions opts({ "multipath_test" });
@@ -402,9 +403,7 @@ void 
BlockManagerTest<LogBlockManagerNativeMeta>::RunMultipathTest(const vector<
   }
   ASSERT_OK(transaction->CommitCreatedBlocks());
 
-  // Verify the results. (numPaths * 2) containers were created, each
-  // consisting of 2 files. Thus, there should be a total of
-  // (numPaths * 4) files, ignoring '.', '..', and instance files.
+  // Verify the results.
   int sum = 0;
   for (const string& path : paths) {
     vector<string> children;
@@ -413,7 +412,20 @@ void 
BlockManagerTest<LogBlockManagerNativeMeta>::RunMultipathTest(const vector<
     ASSERT_OK(CountFiles(path, &files_in_path));
     sum += files_in_path;
   }
-  ASSERT_EQ(paths.size() * 4, sum);
+
+  if (std::is_same<T, LogBlockManagerNativeMeta>::value) {
+    // (numPaths * 2) containers were created, each consisting of 2 files.
+    // Thus, there should be a total of (numPaths * 4) files, ignoring '.',
+    // '..', and instance files.
+    ASSERT_EQ(paths.size() * 4, sum);
+  } else {
+    // (numPaths * 2) containers were created, each consisting of 1 file.
+    // Thus, there should be a total of (numPaths * 2) files, ignoring '.',
+    // '..', 'rdb', and instance files.
+    bool is_logr = std::is_same<T, LogBlockManagerRdbMeta>::value;
+    ASSERT_TRUE(is_logr);
+    ASSERT_EQ(paths.size() * 2, sum);
+  }
 }
 
 template <>
@@ -433,8 +445,8 @@ void 
BlockManagerTest<FileBlockManager>::RunMemTrackerTest() {
   ASSERT_EQ(tracker->consumption(), initial_mem);
 }
 
-template <>
-void BlockManagerTest<LogBlockManagerNativeMeta>::RunMemTrackerTest() {
+template<typename T>
+void BlockManagerTest<T>::RunMemTrackerTest() {
   shared_ptr<MemTracker> tracker = MemTracker::CreateTracker(-1, "test 
tracker");
   ASSERT_OK(ReopenBlockManager(scoped_refptr<MetricEntity>(),
                                tracker,
@@ -454,7 +466,8 @@ void 
BlockManagerTest<LogBlockManagerNativeMeta>::RunMemTrackerTest() {
 
 // What kinds of BlockManagers are supported?
 #if defined(__linux__)
-typedef ::testing::Types<FileBlockManager, LogBlockManagerNativeMeta> 
BlockManagers;
+typedef ::testing::Types<FileBlockManager, LogBlockManagerNativeMeta, 
LogBlockManagerRdbMeta>
+    BlockManagers;
 #else
 typedef ::testing::Types<FileBlockManager> BlockManagers;
 #endif
@@ -1216,6 +1229,13 @@ TYPED_TEST(BlockManagerTest, 
ConcurrentCloseFinalizedWritableBlockTest) {
 }
 
 TYPED_TEST(BlockManagerTest, TestBlockTransaction) {
+  MetricRegistry registry;
+  scoped_refptr<MetricEntity> entity = 
METRIC_ENTITY_server.Instantiate(&registry, "test");
+  ASSERT_OK(this->ReopenBlockManager(entity,
+                                     shared_ptr<MemTracker>(),
+                                     { this->test_dir_ },
+                                     false /* create */));
+
   // Create a BlockCreationTransaction. In this transaction,
   // create some blocks and commit the writes all together.
   const string kTestData = "test data";
@@ -1256,6 +1276,26 @@ TYPED_TEST(BlockManagerTest, TestBlockTransaction) {
   }
   vector<BlockId> deleted_blocks;
   ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted_blocks));
+  deletion_transaction.reset();
+  this->dd_manager_->WaitOnClosures();
+
+  // Metric 'log_block_manager_holes_punched' is only updated when 
--block_manager
+  // is "log" or "logr".
+  int64_t holes_punched = 0;
+  if (FsManager::IsLogType(FLAGS_block_manager)) {
+    // Continuous blocks will be hole punched together, so 
'log_block_manager_holes_punched' is
+    // less or equal to the size of 'deleted_blocks', but it must be greater 
than 0.
+    holes_punched = down_cast<Counter *>(
+        
entity->FindOrNull(METRIC_log_block_manager_holes_punched).get())->value();
+    ASSERT_GE(deleted_blocks.size(), holes_punched);
+    ASSERT_GT(holes_punched, 0);
+  }
+
+  // Metric 'block_manager_total_blocks_deleted' is equal to the size of 
'deleted_blocks'.
+  int64_t total_blocks_deleted = down_cast<Counter*>(
+      
entity->FindOrNull(METRIC_block_manager_total_blocks_deleted).get())->value();
+  ASSERT_EQ(deleted_blocks.size(), total_blocks_deleted);
+
   for (const auto& block : deleted_blocks) {
     created_blocks.erase(std::remove(created_blocks.begin(), 
created_blocks.end(), block),
                          created_blocks.end());
@@ -1266,16 +1306,38 @@ TYPED_TEST(BlockManagerTest, TestBlockTransaction) {
   // in order to test that the first failure properly propagates.
   FLAGS_crash_on_eio = false;
   FLAGS_env_inject_eio = 1.0;
+  deletion_transaction = this->bm_->NewDeletionTransaction();
   for (const auto& block : created_blocks) {
     deletion_transaction->AddDeletedBlock(block);
   }
   deleted_blocks.clear();
   Status s = deletion_transaction->CommitDeletedBlocks(&deleted_blocks);
-  ASSERT_TRUE(s.IsIOError());
-  ASSERT_TRUE(deleted_blocks.empty());
-  ASSERT_STR_MATCHES(s.ToString(),
-                     Substitute("only 0/$0 blocks deleted, first failure",
-                                created_blocks.size()));
+  deletion_transaction.reset();
+  this->dd_manager_->WaitOnClosures();
+
+  if (FsManager::IsLogType(FLAGS_block_manager)) {
+    // Metric 'log_block_manager_holes_punched' is not changed because IO 
error injected.
+    ASSERT_EQ(holes_punched, down_cast<Counter*>(
+        
entity->FindOrNull(METRIC_log_block_manager_holes_punched).get())->value());
+  }
+
+  // Because the "logr" block_manager uses RocksDB to store metadata, and the 
IO error injected
+  // into util/env_posix.cc does not affect RocksDB, updating the metadata 
stored in the
+  // logr-based block manager succeeds without any errors.
+  if (FLAGS_block_manager == "logr") {
+    ASSERT_OK(s);
+    ASSERT_EQ(created_blocks.size(), deleted_blocks.size());
+    ASSERT_EQ(total_blocks_deleted + deleted_blocks.size(), 
down_cast<Counter*>(
+        
entity->FindOrNull(METRIC_block_manager_total_blocks_deleted).get())->value());
+  } else {
+    ASSERT_TRUE(s.IsIOError());
+    ASSERT_TRUE(deleted_blocks.empty());
+    ASSERT_EQ(total_blocks_deleted, down_cast<Counter*>(
+        
entity->FindOrNull(METRIC_block_manager_total_blocks_deleted).get())->value());
+    ASSERT_STR_MATCHES(s.ToString(),
+                       Substitute("only 0/$0 blocks deleted, first failure",
+                                  created_blocks.size()));
+  }
 }
 
 } // namespace fs
diff --git a/src/kudu/fs/block_manager.h b/src/kudu/fs/block_manager.h
index 264d22507..380e5b6c4 100644
--- a/src/kudu/fs/block_manager.h
+++ b/src/kudu/fs/block_manager.h
@@ -200,7 +200,7 @@ class BlockManager : public 
RefCountedThreadSafe<BlockManager> {
   // Lists the available block manager types.
   static std::vector<std::string> block_manager_types() {
 #if defined(__linux__)
-    return { "file", "log" };
+    return { "file", "log", "logr" };
 #else
     return { "file" };
 #endif
diff --git a/src/kudu/fs/data_dirs.cc b/src/kudu/fs/data_dirs.cc
index 20a73ddf0..2198a6c9b 100644
--- a/src/kudu/fs/data_dirs.cc
+++ b/src/kudu/fs/data_dirs.cc
@@ -39,6 +39,7 @@
 #include "kudu/fs/block_manager.h"
 #include "kudu/fs/dir_util.h"
 #include "kudu/fs/fs.pb.h"
+#include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/integral_types.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/map-util.h"
@@ -200,6 +201,10 @@ std::unique_ptr<Dir> DataDirManager::CreateNewDir(
     Env* env, DirMetrics* metrics, FsType fs_type,
     std::string dir, std::unique_ptr<DirInstanceMetadataFile> metadata_file,
     std::unique_ptr<ThreadPool> pool) {
+  if (FLAGS_block_manager == "logr") {
+    return std::make_unique<RdbDir>(env, metrics, fs_type, std::move(dir),
+                                    std::move(metadata_file), std::move(pool));
+  }
   return std::make_unique<Dir>(env, metrics, fs_type, std::move(dir),
                                std::move(metadata_file), std::move(pool));
 }
@@ -270,7 +275,7 @@ int DataDirManager::max_dirs() const {
 }
 
 Status DataDirManager::PopulateDirectoryMaps(const vector<unique_ptr<Dir>>& 
dirs) {
-  if (opts_.dir_type == "log") {
+  if (FsManager::IsLogType(opts_.dir_type)) {
     return DirManager::PopulateDirectoryMaps(dirs);
   }
   DCHECK_EQ("file", opts_.dir_type);
diff --git a/src/kudu/fs/dir_manager.cc b/src/kudu/fs/dir_manager.cc
index a0f489118..767ffa6cd 100644
--- a/src/kudu/fs/dir_manager.cc
+++ b/src/kudu/fs/dir_manager.cc
@@ -32,6 +32,12 @@
 
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
+#include <rocksdb/cache.h>
+#include <rocksdb/filter_policy.h>
+#include <rocksdb/options.h>
+#include <rocksdb/slice_transform.h>
+#include <rocksdb/status.h>
+#include <rocksdb/table.h>
 
 #include "kudu/fs/dir_util.h"
 #include "kudu/fs/fs.pb.h"
@@ -39,6 +45,7 @@
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
 #include "kudu/util/env.h"
 #include "kudu/util/env_util.h"
 #include "kudu/util/oid_generator.h"
@@ -47,9 +54,11 @@
 #include "kudu/util/random_util.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/stopwatch.h"
+#include "kudu/util/test_util_prod.h"
 #include "kudu/util/threadpool.h"
 
 using std::set;
+using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
 using std::unordered_map;
@@ -59,9 +68,33 @@ using strings::Substitute;
 
 DECLARE_int32(fs_data_dirs_available_space_cache_seconds);
 DECLARE_int64(fs_data_dirs_reserved_bytes);
+DECLARE_string(block_manager);
 
 namespace kudu {
 
+Status FromRdbStatus(const rocksdb::Status& s) {
+  switch (s.code()) {
+    case rocksdb::Status::kOk:
+      return Status::OK();
+    case rocksdb::Status::kNotFound:
+      return Status::NotFound(s.ToString());
+    case rocksdb::Status::kCorruption:
+      return Status::Corruption(s.ToString());
+    case rocksdb::Status::kNotSupported:
+      return Status::NotSupported(s.ToString());
+    case rocksdb::Status::kInvalidArgument:
+      return Status::InvalidArgument(s.ToString());
+    case rocksdb::Status::kIOError:
+      return Status::IOError(s.ToString());
+    case rocksdb::Status::kIncomplete:
+      return Status::Incomplete(s.ToString());
+    case rocksdb::Status::kAborted:
+      return Status::Aborted(s.ToString());
+    default:
+      return Status::RuntimeError(s.ToString());
+  }
+}
+
 namespace {
 
 // Wrapper for env_util::DeleteTmpFilesRecursively that is suitable for 
parallel
@@ -171,6 +204,105 @@ int Dir::reserved_bytes() {
   return FLAGS_fs_data_dirs_reserved_bytes;
 }
 
+shared_ptr<rocksdb::Cache> RdbDir::s_block_cache_;
+RdbDir::RdbDir(Env* env, DirMetrics* metrics,
+               FsType fs_type,
+               string dir,
+               unique_ptr<DirInstanceMetadataFile> metadata_file,
+               unique_ptr<ThreadPool> pool)
+    : Dir(env, metrics, fs_type, std::move(dir), std::move(metadata_file), 
std::move(pool)) {}
+
+Status RdbDir::Prepare() {
+  DCHECK_STREQ(FLAGS_block_manager.c_str(), "logr");
+  if (db_) {
+    // Some unit tests (e.g. BlockManagerTest.PersistenceTest) reopen the 
block manager,
+    // 'db_' is possible to be non-nullptr.
+    // In non-test environments, 'db_' is always to be nullptr, log a fatal 
error if not.
+    DCHECK(IsGTest()) <<
+        Substitute("It's not allowed to reopen the RocksDB $0 except in 
tests", dir_);
+    return Status::OK();
+  }
+
+  // See the rocksdb::Options details:
+  // https://github.com/facebook/rocksdb/blob/main/include/rocksdb/options.h
+  rocksdb::Options opts;
+  // A RocksDB instance is created if it does not exist when opening the Dir.
+  // TODO(yingchun): We should distinguish creating new data directory and 
opening existing data
+  //                 directory, and set proper options to avoid mishaps.
+  //                 When creating new data directory, set 
opts.error_if_exists = true.
+  //                 When opening existing data directory, set 
opts.create_if_missing = false.
+  opts.create_if_missing = true;
+  // TODO(yingchun): parameterize more rocksDB options, including:
+  //  opts.use_fsync
+  //  opts.error_if_exists
+  //  opts.db_log_dir
+  //  opts.wal_dir
+  //  opts.max_log_file_size
+  //  opts.keep_log_file_num
+  //  opts.max_manifest_file_size
+  //  opts.max_background_jobs
+  //  opts.write_buffer_size
+  //  opts.level0_file_num_compaction_trigger
+  //  opts.max_write_buffer_number
+
+  static std::once_flag flag;
+  std::call_once(flag, [&]() {
+    // TODO(yingchun): parameterize the rocksdb block cache size.
+    s_block_cache_ = rocksdb::NewLRUCache(10 << 20);
+  });
+  rocksdb::BlockBasedTableOptions tbl_opts;
+  tbl_opts.block_cache = s_block_cache_;
+  tbl_opts.whole_key_filtering = false;
+  // TODO(yingchun): parameterize these options.
+  tbl_opts.filter_policy.reset(rocksdb::NewBloomFilterPolicy(9.9));
+  opts.table_factory.reset(NewBlockBasedTableFactory(tbl_opts));
+  // Take advantage of Prefix-Seek, see 
https://github.com/facebook/rocksdb/wiki/Prefix-Seek.
+  
opts.prefix_extractor.reset(rocksdb::NewFixedPrefixTransform(ObjectIdGenerator::IdLength()));
+  opts.memtable_prefix_bloom_size_ratio = 0.1;
+
+  rdb_dir_ = JoinPathSegments(dir_, "rdb");
+  rocksdb::DB* db_temp = nullptr;
+  rocksdb::Status rdb_s = rocksdb::DB::Open(opts, *rdb_dir_, &db_temp);
+  RETURN_NOT_OK_PREPEND(FromRdbStatus(rdb_s),
+                        Substitute("open RocksDB failed, path: $0", 
*rdb_dir_));
+  db_.reset(db_temp);
+  return Status::OK();
+}
+
+void RdbDir::Shutdown() {
+  if (is_shutdown_) {
+    return;
+  }
+
+  // Shut down the thread pool before closing RocksDB to make sure there 
aren't any in-flight
+  // write operations.
+  WaitOnClosures();
+  pool_->Shutdown();
+
+  // The 'db_' is nullptr if the Dir open failed.
+  if (db_) {
+    // Flushing memtable before closing RocksDB reduces bootstrapping time 
upon next start-up.
+    // Call Flush() rather than WaitForCompact(), because it's enough to wait 
for the flush jobs
+    // to finish, compactions jobs may take more time, which results in longer 
times to shut down
+    // a server.
+    rocksdb::FlushOptions options;
+    options.wait = true;
+    WARN_NOT_OK(FromRdbStatus(db_->Flush(options)),
+                Substitute("Flush RocksDB failed, path: $0", *rdb_dir_));
+    WARN_NOT_OK(FromRdbStatus(db_->Close()),
+                Substitute("Closed RocksDB with error, path: $0", *rdb_dir_));
+    db_.reset();
+  }
+
+  is_shutdown_ = true;
+}
+
+rocksdb::DB* RdbDir::rdb() {
+  DCHECK_STREQ(FLAGS_block_manager.c_str(), "logr");
+  DCHECK(db_);
+  return db_.get();
+}
+
 DirManagerOptions::DirManagerOptions(string dir_type,
                                      string tid)
     : dir_type(std::move(dir_type)),
@@ -631,6 +763,15 @@ Dir* DirManager::FindDirByUuidIndex(int uuid_idx) const {
   return FindPtrOrNull(dir_by_uuid_idx_, uuid_idx);
 }
 
+Dir* DirManager::FindDirByFullPathForTests(const std::string& full_path) const 
{
+  for (const auto& d : dirs_) {
+    if (HasPrefixString(full_path, d->dir())) {
+      return d.get();
+    }
+  }
+  return nullptr;
+}
+
 bool DirManager::FindUuidIndexByDir(Dir* dir, int* uuid_idx) const {
   return FindCopy(uuid_idx_by_dir_, dir, uuid_idx);
 }
diff --git a/src/kudu/fs/dir_manager.h b/src/kudu/fs/dir_manager.h
index a6f14ff9d..85cca3be5 100644
--- a/src/kudu/fs/dir_manager.h
+++ b/src/kudu/fs/dir_manager.h
@@ -22,11 +22,15 @@
 #include <functional>
 #include <memory>
 #include <mutex>
+#include <optional>
 #include <set>
 #include <string>
 #include <unordered_map>
 #include <vector>
 
+#include <gtest/gtest_prod.h>
+#include <rocksdb/db.h>
+
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/util/locks.h"
@@ -35,8 +39,18 @@
 #include "kudu/util/random.h"
 #include "kudu/util/status.h"
 
+namespace rocksdb {
+class Cache;
+class Status;
+} // namespace rocksdb
+
 namespace kudu {
 
+// Convert a rocksdb::Status to a kudu::Status.
+// NOTE: Keep it here rather than util/status.h because the latter is
+// an exported header, but FromRdbStatus() is used only internally.
+Status FromRdbStatus(const rocksdb::Status& s);
+
 class Env;
 class ThreadPool;
 
@@ -104,9 +118,12 @@ class Dir {
       std::unique_ptr<ThreadPool> pool);
   virtual ~Dir();
 
+  // Some preparatory work before opening the directory.
+  virtual Status Prepare() { return Status::OK(); }
+
   // Shuts down this dir's thread pool, waiting for any closures submitted via
   // ExecClosure() to finish first.
-  void Shutdown();
+  virtual void Shutdown();
 
   // Run a task on this dir's thread pool.
   //
@@ -159,7 +176,7 @@ class Dir {
   // value of -1 means 1% of the disk space in a directory will be reserved.
   static int reserved_bytes();
 
- private:
+ protected:
   Env* env_;
   DirMetrics* metrics_;
   const FsType fs_type_;
@@ -180,6 +197,37 @@ class Dir {
   DISALLOW_COPY_AND_ASSIGN(Dir);
 };
 
+// Representation of a directory for LogBlockManagerRdbMeta specially.
+class RdbDir: public Dir {
+ public:
+  RdbDir(Env* env,
+         DirMetrics* metrics,
+         FsType fs_type,
+         std::string dir,
+         std::unique_ptr<DirInstanceMetadataFile> metadata_file,
+         std::unique_ptr<ThreadPool> pool);
+
+  // Initialize the RocksDB instance for the directory.
+  //
+  // Returns Status::OK() if prepared successfully, otherwise returns non-OK.
+  Status Prepare() override;
+
+  // Similar to Dir::Shutdown(), but close the RocksDB instance additionally.
+  void Shutdown() override;
+
+  rocksdb::DB* rdb();
+
+ private:
+  // The shared RocksDB instance for this directory.
+  std::unique_ptr<rocksdb::DB> db_;
+  // The RocksDB full path.
+  std::optional<std::string> rdb_dir_;
+  // The block cache shared by all RocksDB instances in all data directories.
+  static std::shared_ptr<rocksdb::Cache> s_block_cache_;
+
+  DISALLOW_COPY_AND_ASSIGN(RdbDir);
+};
+
 struct DirManagerOptions {
  public:
   // The type of directory this directory manager should support.
@@ -304,6 +352,8 @@ class DirManager {
                                             std::unique_ptr<ThreadPool> pool) 
= 0;
 
  protected:
+  FRIEND_TEST(LogBlockManagerRdbMetaTest, TestHalfPresentContainer);
+
   // The name to be used by this directory manager for each sub-directory of
   // each directory root.
   virtual const char* dir_name() const = 0;
@@ -397,6 +447,11 @@ class DirManager {
       const std::vector<std::unique_ptr<DirInstanceMetadataFile>>& 
instances_to_update,
       const std::set<std::string>& new_all_uuids);
 
+  // Finds a directory by full path name, returning null if it can't be found.
+  //
+  // NOTE: Only for test purpose.
+  Dir* FindDirByFullPathForTests(const std::string& full_path) const;
+
   // The environment to be used for all directory operations.
   Env* env_;
 
diff --git a/src/kudu/fs/dir_util.cc b/src/kudu/fs/dir_util.cc
index a65a79bb0..206e5b59f 100644
--- a/src/kudu/fs/dir_util.cc
+++ b/src/kudu/fs/dir_util.cc
@@ -14,6 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+
 #include "kudu/fs/dir_util.h"
 
 #include <cstdint>
@@ -25,6 +26,7 @@
 #include <glog/logging.h>
 
 #include "kudu/fs/fs.pb.h"
+#include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -168,7 +170,7 @@ Status DirInstanceMetadataFile::Create(const set<string>& 
all_uuids,
 
   // If we're initializing the log block manager, check that we support
   // hole-punching.
-  if (dir_type_ == "log") {
+  if (FsManager::IsLogType(dir_type_)) {
     RETURN_NOT_OK_FAIL_INSTANCE_PREPEND(CheckHolePunch(env_, dir_name),
                                         kHolePunchErrorMsg);
   }
diff --git a/src/kudu/fs/file_block_manager.h b/src/kudu/fs/file_block_manager.h
index 81ab17561..bafee8e3d 100644
--- a/src/kudu/fs/file_block_manager.h
+++ b/src/kudu/fs/file_block_manager.h
@@ -71,6 +71,8 @@ struct BlockManagerMetrics;
 // The file-backed block manager.
 class FileBlockManager : public BlockManager {
  public:
+  static constexpr const char* const name() { return "file"; }
+
   // Note: all objects passed as pointers should remain alive for the lifetime
   // of the block manager.
   FileBlockManager(Env* env,
diff --git a/src/kudu/fs/fs_manager-test.cc b/src/kudu/fs/fs_manager-test.cc
index 4bebe106d..7da7803a2 100644
--- a/src/kudu/fs/fs_manager-test.cc
+++ b/src/kudu/fs/fs_manager-test.cc
@@ -62,11 +62,13 @@
 #include "kudu/util/random.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
+#include "kudu/util/stopwatch.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
 using kudu::pb_util::ReadPBContainerFromPath;
 using kudu::pb_util::SecureDebugString;
+using std::make_tuple;
 using std::nullopt;
 using std::shared_ptr;
 using std::string;
@@ -216,9 +218,23 @@ class FsManagerTestBase : public KuduTest,
 };
 
 INSTANTIATE_TEST_SUITE_P(BlockManagerTypes, FsManagerTestBase,
-    ::testing::Combine(
-        ::testing::ValuesIn(BlockManager::block_manager_types()),
-        ::testing::ValuesIn(kEncryptionType)));
+// TODO(yingchun): When --enable_multi_tenancy is set, the data directories 
are still shared by
+//  all tenants, which will cause some errors when --block_manager=logr. This 
will be fixed in the
+//  future as the TODO mentioned in [1]. We can enable all the following test 
cases when the TODO
+//  is addressed.
+//  
1.https://github.com/acelyc111/kudu/blob/master/src/kudu/fs/fs_manager.cc#L1190
+//
+//    ::testing::ValuesIn(BlockManager::block_manager_types()),
+//    ::testing::ValuesIn(kEncryptionType))
+    ::testing::Values(
+    make_tuple("file", kEncryptionType[0]),
+    make_tuple("file", kEncryptionType[1]),
+    make_tuple("file", kEncryptionType[2]),
+    make_tuple("log", kEncryptionType[0]),
+    make_tuple("log", kEncryptionType[1]),
+    make_tuple("log", kEncryptionType[2]),
+    make_tuple("logr", kEncryptionType[0]),
+    make_tuple("logr", kEncryptionType[1])));
 
 TEST_P(FsManagerTestBase, TestBaseOperations) {
   fs_manager()->DumpFileSystemTree(std::cout, tenant_id());
@@ -497,8 +513,7 @@ TEST_P(FsManagerTestBase, TestMetadataDirInDataRoot) {
   // Now let's test adding data directories with metadata in the data root.
   // Adding data directories is not supported by the file block manager.
   if (FLAGS_block_manager == "file") {
-    LOG(INFO) << "Skipping the rest of test, file block manager not supported";
-    return;
+    GTEST_SKIP() << "Skipping the rest of test, file block manager not 
supported";
   }
 
   // Adding a data dir to the front of the FS root list (i.e. such that the
@@ -1229,12 +1244,20 @@ TEST_P(FsManagerTestBase, TestAddRemoveSpeculative) {
 }
 
 TEST_P(FsManagerTestBase, TestAddRemoveDataDirsFuzz) {
-  const int kNumAttempts = AllowSlowTests() ? 1000 : 100;
-
   if (FLAGS_block_manager == "file") {
     GTEST_SKIP() << "Skipping test, file block manager not supported";
   }
 
+#if defined(THREAD_SANITIZER) || defined(ADDRESS_SANITIZER)
+  // When using a sanitizer, reduce the loop times to get a more stable result.
+  const int kNumAttempts = 50;
+#else
+  // In some situations, the tests would last too long time, so we reduce the 
loop times if not
+  // AllowSlowTests(). For example, when FLAGS_block_manager == "logr", opens 
a data directory will
+  // open a RocksDB instance, it consumes more time than that if 
FLAGS_block_manager == "log".
+  const int kNumAttempts = AllowSlowTests() ? 1000 : 50;
+#endif
+
   Random rng_(SeedRandom());
 
   FsManagerOpts fs_opts;
@@ -1263,7 +1286,7 @@ TEST_P(FsManagerTestBase, TestAddRemoveDataDirsFuzz) {
     }
 
     // Try to add or remove it with failure injection enabled.
-    LOG(INFO) << Substitute("$0ing $1", action_was_add ? "Add" : "Remov", 
fs_root);
+    SCOPED_LOG_TIMING(INFO, Substitute("$0ing $1", action_was_add ? "add" : 
"remov", fs_root));
     bool update_succeeded;
     {
       google::FlagSaver saver;
diff --git a/src/kudu/fs/fs_manager.cc b/src/kudu/fs/fs_manager.cc
index 702ca2b1b..49e67de23 100644
--- a/src/kudu/fs/fs_manager.cc
+++ b/src/kudu/fs/fs_manager.cc
@@ -80,8 +80,9 @@ TAG_FLAG(enable_data_block_fsync, unsafe);
 
 #if defined(__linux__)
 DEFINE_string(block_manager, "log", "Which block manager to use for storage. "
-              "Valid options are 'file' and 'log'. The file block manager is 
not suitable for "
-              "production use due to scaling limitations.");
+              "Valid options are 'file', 'log' and 'logr'. The file block "
+              "manager is not suitable for production use due to scaling "
+              "limitations.");
 #else
 DEFINE_string(block_manager, "file", "Which block manager to use for storage. "
               "Only the file block manager is supported for non-Linux 
systems.");
@@ -176,6 +177,7 @@ using kudu::fs::FsErrorManager;
 using kudu::fs::FileBlockManager;
 using kudu::fs::FsReport;
 using kudu::fs::LogBlockManagerNativeMeta;
+using kudu::fs::LogBlockManagerRdbMeta;
 using kudu::fs::ReadableBlock;
 using kudu::fs::UpdateInstanceBehavior;
 using kudu::fs::WritableBlock;
@@ -407,6 +409,10 @@ scoped_refptr<BlockManager> 
FsManager::InitBlockManager(const string& tenant_id)
     block_manager.reset(new LogBlockManagerNativeMeta(
         GetEnv(tenant_id), dd_manager(tenant_id), error_manager_,
         opts_.file_cache, std::move(bm_opts), tenant_id));
+  } else if (opts_.block_manager_type == "logr") {
+    block_manager.reset(new LogBlockManagerRdbMeta(
+        GetEnv(tenant_id), dd_manager(tenant_id), error_manager_,
+        opts_.file_cache, std::move(bm_opts), tenant_id));
   } else {
     LOG(FATAL) << "Unknown block_manager_type: " << opts_.block_manager_type;
   }
@@ -611,7 +617,7 @@ Status FsManager::Open(FsReport* report, Timer* 
read_instance_metadata_files,
                                           
BlockManager::MergeReport::REQUIRED));
     if (read_data_directories) {
       read_data_directories->Stop();
-      if (opts_.metric_entity && opts_.block_manager_type == "log") {
+      if (opts_.metric_entity && 
FsManager::IsLogType(opts_.block_manager_type)) {
         
METRIC_log_block_manager_containers_processing_time_startup.Instantiate(opts_.metric_entity,
             (read_data_directories->TimeElapsed()).ToMilliseconds());
       }
@@ -790,6 +796,10 @@ void 
FsManager::UpdateMetadataFormatAndStampUnlock(InstanceMetadataPB* metadata)
   metadata->set_format_stamp(Substitute("Formatted at $0 on $1", time_str, 
hostname));
 }
 
+bool FsManager::IsLogType(const std::string& block_manager_type) {
+  return (block_manager_type == "log" || block_manager_type == "logr");
+}
+
 Status FsManager::AddTenantMetadata(const string& tenant_name,
                                     const string& tenant_id,
                                     const string& tenant_key,
diff --git a/src/kudu/fs/fs_manager.h b/src/kudu/fs/fs_manager.h
index 7abdb9fac..4a18962b5 100644
--- a/src/kudu/fs/fs_manager.h
+++ b/src/kudu/fs/fs_manager.h
@@ -447,6 +447,9 @@ class FsManager {
     return meta_on_xfs_;
   }
 
+  // Return true if 'block_manager_type' represents a log-backed block manager.
+  static bool IsLogType(const std::string& block_manager_type);
+
  private:
   FRIEND_TEST(fs::FsManagerTestBase, TestDuplicatePaths);
   FRIEND_TEST(fs::FsManagerTestBase, TestEIOWhileRunningUpdateDirsTool);
diff --git a/src/kudu/fs/fs_report.cc b/src/kudu/fs/fs_report.cc
index e4ee60440..b4f7986f0 100644
--- a/src/kudu/fs/fs_report.cc
+++ b/src/kudu/fs/fs_report.cc
@@ -252,6 +252,35 @@ LBMPartialRecordCheck::Entry::Entry(string c, int64_t o)
       repaired(false) {
 }
 
+///////////////////////////////////////////////////////////////////////////////
+// LBMCorruptedRdbRecordCheck
+///////////////////////////////////////////////////////////////////////////////
+
+void LBMCorruptedRdbRecordCheck::MergeFrom(
+    const LBMCorruptedRdbRecordCheck& other) {
+  MERGE_ENTRIES_FROM(other);
+}
+
+string LBMCorruptedRdbRecordCheck::ToString() const {
+  // Aggregate interesting stats from all of the entries.
+  int64_t corrupted_records_repaired = 0;
+  for (const auto& pr : entries) {
+    if (pr.repaired) {
+      corrupted_records_repaired++;
+    }
+  }
+
+  return Substitute("Total corrupted LBM metadata records in RocksDB: "
+      "$0 ($1 repaired)\n",
+                    entries.size(), corrupted_records_repaired);
+}
+
+LBMCorruptedRdbRecordCheck::Entry::Entry(string c, string k)
+    : container(std::move(c)),
+      rocksdb_key(std::move(k)),
+      repaired(false) {
+}
+
 ///////////////////////////////////////////////////////////////////////////////
 // FsReport::Stats
 ///////////////////////////////////////////////////////////////////////////////
@@ -301,6 +330,7 @@ void FsReport::MergeFrom(const FsReport& other) {
   MERGE_ONE_CHECK(malformed_record_check);
   MERGE_ONE_CHECK(misaligned_block_check);
   MERGE_ONE_CHECK(partial_record_check);
+  MERGE_ONE_CHECK(corrupted_rdb_record_check);
 
 #undef MERGE_ONE_CHECK
 }
@@ -329,6 +359,7 @@ string FsReport::ToString() const {
   TOSTRING_ONE_CHECK(malformed_record_check, "malformed LBM records");
   TOSTRING_ONE_CHECK(misaligned_block_check, "misaligned LBM blocks");
   TOSTRING_ONE_CHECK(partial_record_check, "partial LBM records");
+  TOSTRING_ONE_CHECK(corrupted_rdb_record_check, "corrupted LBM rdb records");
 
 #undef TOSTRING_ONE_CHECK
   return s;
diff --git a/src/kudu/fs/fs_report.h b/src/kudu/fs/fs_report.h
index 0ff43e674..fc983d847 100644
--- a/src/kudu/fs/fs_report.h
+++ b/src/kudu/fs/fs_report.h
@@ -173,6 +173,26 @@ struct LBMPartialRecordCheck {
   std::vector<Entry> entries;
 };
 
+// Checks for corrupted LBM metadata records in RocksDB.
+//
+// Error type: non-fatal and repairable (by removing the records from RocksDB).
+struct LBMCorruptedRdbRecordCheck {
+
+  // Merges the contents of another check into this one.
+  void MergeFrom(const LBMCorruptedRdbRecordCheck& other);
+
+  // Returns a multi-line string representation of this check.
+  std::string ToString() const;
+
+  struct Entry {
+    Entry(std::string c, std::string k);
+    std::string container;
+    std::string rocksdb_key;
+    bool repaired;
+  };
+  std::vector<Entry> entries;
+};
+
 // Results of a Kudu filesystem-wide check. The report contains general
 // statistics about the filesystem as well as a series of "checks" that
 // describe possible on-disk inconsistencies.
@@ -268,6 +288,7 @@ struct FsReport {
   std::optional<LBMMalformedRecordCheck> malformed_record_check;
   std::optional<LBMMisalignedBlockCheck> misaligned_block_check;
   std::optional<LBMPartialRecordCheck> partial_record_check;
+  std::optional<LBMCorruptedRdbRecordCheck> corrupted_rdb_record_check;
 };
 
 } // namespace fs
diff --git a/src/kudu/fs/log_block_manager-test-util.cc 
b/src/kudu/fs/log_block_manager-test-util.cc
index 79147b3d6..c2aa8bb89 100644
--- a/src/kudu/fs/log_block_manager-test-util.cc
+++ b/src/kudu/fs/log_block_manager-test-util.cc
@@ -28,10 +28,16 @@
 
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
+#include <rocksdb/db.h>
+#include <rocksdb/options.h>
+#include <rocksdb/slice.h>
 
 #include "kudu/fs/block_id.h"
+#include "kudu/fs/data_dirs.h"
+#include "kudu/fs/dir_manager.h"
 #include "kudu/fs/fs.pb.h"
 #include "kudu/fs/log_block_manager.h"
+#include "kudu/gutil/casts.h"
 #include "kudu/gutil/integral_types.h"
 #include "kudu/gutil/strings/strcat.h"
 #include "kudu/gutil/strings/strip.h"
@@ -59,19 +65,23 @@ using strings::Substitute;
 // LBMCorruptor to trace the LogBlockManagerNativeMeta data corruption.
 class NativeMetadataLBMCorruptor : public LBMCorruptor {
  public:
-  NativeMetadataLBMCorruptor(Env* env, vector<string> data_dirs, uint32_t 
rand_seed)
-      : LBMCorruptor(env, std::move(data_dirs), rand_seed) {
+  NativeMetadataLBMCorruptor(Env* env, DataDirManager* dd_manager, uint32_t 
rand_seed)
+      : LBMCorruptor(env, dd_manager, rand_seed) {
     max_malformed_types_ = MalformedRecordType::kTwoCreatesForSameBlockId;
   }
 
  private:
   Status CreateIncompleteContainer() override;
 
-  Status AppendRecord(const Container* c, BlockId block_id,
-                      int64_t block_offset, int64_t block_length) override;
+  Status AppendRecord(const Container* c,
+                      BlockId block_id,
+                      int64_t block_offset,
+                      int64_t block_length) override;
 
-  Status AppendCreateRecord(const Container* c, BlockId block_id,
-                            uint64_t block_offset, int64_t block_length) 
override;
+  Status AppendCreateRecord(const Container* c,
+                            BlockId block_id,
+                            uint64_t block_offset,
+                            int64_t block_length) override;
 
   Status AppendPartialRecord(const Container* c, BlockId block_id) override;
 
@@ -79,58 +89,100 @@ class NativeMetadataLBMCorruptor : public LBMCorruptor {
 
   Status CreateContainerMetadataPart(const string& unsuffixed_path) const;
 
-  static Status AppendCreateRecordInternal(WritablePBContainerFile* writer, 
BlockId block_id,
-                                           int64_t block_offset, int64_t 
block_length);
+  static Status AppendCreateRecordInternal(WritablePBContainerFile* writer,
+                                           BlockId block_id,
+                                           int64_t block_offset,
+                                           int64_t block_length);
   static Status AppendDeleteRecordInternal(WritablePBContainerFile* writer, 
BlockId block_id);
 
-  Status CreateMalformedRecord(
-      const Container* c, MalformedRecordType error_type, BlockRecordPB* 
record) override;
+  Status CreateMalformedRecord(const Container* c,
+                               MalformedRecordType error_type,
+                               BlockRecordPB* record) override;
 };
 
-unique_ptr<LBMCorruptor> LBMCorruptor::Create(
-    Env* env, std::vector<std::string> data_dirs, uint32_t rand_seed) {
-  if (FLAGS_block_manager == "log") {
-    return std::make_unique<NativeMetadataLBMCorruptor>(env, 
std::move(data_dirs), rand_seed);
+// LBMCorruptor to trace the LogBlockManagerRdbMeta data corruption.
+class RdbMetadataLBMCorruptor : public LBMCorruptor {
+ public:
+  RdbMetadataLBMCorruptor(Env* env, DataDirManager* dd_manager, uint32_t 
rand_seed)
+      : LBMCorruptor(env, dd_manager, rand_seed) {
+    max_malformed_types_ = MalformedRecordType::kUnrecognizedOpType;
   }
-  return nullptr;
+
+ private:
+  Status CreateIncompleteContainer() override;
+
+  Status AppendRecord(const Container* c,
+                      BlockId block_id,
+                      int64_t block_offset,
+                      int64_t block_length) override;
+
+  Status AppendCreateRecord(const Container* c,
+                            BlockId block_id,
+                            uint64_t block_offset,
+                            int64_t block_length) override;
+
+  Status AppendPartialRecord(const Container* c, BlockId block_id) override;
+
+  Status AppendMetadataRecord(const Container* c, const BlockRecordPB& record) 
override;
+
+  static Status AppendCreateRecordInternal(RdbDir* dir,
+                                           const string& id,
+                                           BlockId block_id,
+                                           int64_t block_offset,
+                                           int64_t block_length);
+  static Status AppendDeleteRecordInternal(RdbDir* dir, const string& id, 
BlockId block_id);
+
+  // Get the RdbDir for the given container.
+  RdbDir* GetRdbDir(const Container* c) const;
+};
+
+unique_ptr<LBMCorruptor> LBMCorruptor::Create(
+        Env* env, DataDirManager* dd_manager, uint32_t rand_seed) {
+    if (FLAGS_block_manager == "log") {
+        return std::make_unique<NativeMetadataLBMCorruptor>(env, dd_manager, 
rand_seed);
+    }
+    if (FLAGS_block_manager == "logr") {
+        return std::make_unique<RdbMetadataLBMCorruptor>(env, dd_manager, 
rand_seed);
+    }
+    return nullptr;
 }
 
-LBMCorruptor::LBMCorruptor(Env* env, vector<string> data_dirs, uint32_t 
rand_seed)
+LBMCorruptor::LBMCorruptor(Env* env, DataDirManager* dd_manager, uint32_t 
rand_seed)
     : env_(env),
-      data_dirs_(std::move(data_dirs)),
+      dd_manager_(dd_manager),
       rand_(rand_seed) {
-  CHECK_GT(data_dirs_.size(), 0);
+  CHECK_GT(dd_manager_->dirs().size(), 0);
 }
 
 Status LBMCorruptor::Init() {
   vector<Container> all_containers;
   vector<Container> full_containers;
 
-  for (const auto& dd : data_dirs_) {
+  for (const auto& dd : dd_manager_->dirs()) {
     vector<string> dd_files;
     unordered_map<string, Container> containers_by_name;
-    RETURN_NOT_OK(env_->GetChildren(dd, &dd_files));
+    RETURN_NOT_OK(env_->GetChildren(dd->dir(), &dd_files));
     for (const auto& f : dd_files) {
       // As we iterate over each file in the data directory, keep track of data
       // and metadata files, so that only containers with both will be 
included.
       string stripped;
       if (TryStripSuffixString(
           f, LogBlockManager::kContainerDataFileSuffix, &stripped)) {
-        containers_by_name[stripped].dir = dd;
+        containers_by_name[stripped].dir = dd->dir();
         containers_by_name[stripped].name = stripped;
-        containers_by_name[stripped].data_filename = JoinPathSegments(dd, f);
+        containers_by_name[stripped].data_filename = 
JoinPathSegments(dd->dir(), f);
       } else if (TryStripSuffixString(
           f, LogBlockManager::kContainerMetadataFileSuffix, &stripped)) {
-        containers_by_name[stripped].dir = dd;
+        containers_by_name[stripped].dir = dd->dir();
         containers_by_name[stripped].name = stripped;
-        containers_by_name[stripped].metadata_filename = JoinPathSegments(dd, 
f);
+        containers_by_name[stripped].metadata_filename = 
JoinPathSegments(dd->dir(), f);
       }
     }
 
     for (const auto& e : containers_by_name) {
-      // Only include the container if both of its files were present.
+      // Only include the container if valid files were present.
       if (!e.second.data_filename.empty() &&
-          !e.second.metadata_filename.empty()) {
+          (!e.second.metadata_filename.empty() || FLAGS_block_manager == 
"logr")) {
         all_containers.push_back(e.second);
 
         // File size is an imprecise proxy for whether a container is full, but
@@ -267,8 +319,9 @@ Status LBMCorruptor::AddMalformedRecordToContainer() {
   return Status::OK();
 }
 
-Status LBMCorruptor::CreateMalformedRecord(
-    const Container* /* c */, MalformedRecordType error_type, BlockRecordPB* 
record) {
+Status LBMCorruptor::CreateMalformedRecord(const Container* /* c */,
+                                           MalformedRecordType error_type,
+                                           BlockRecordPB* record) {
   switch (error_type) {
     case MalformedRecordType::kNoBlockOffset:
       record->clear_offset();
@@ -463,8 +516,8 @@ Status LBMCorruptor::GetRandomContainer(FindContainerMode 
mode,
   return Status::OK();
 }
 
-const string& LBMCorruptor::GetRandomDataDir() const {
-  return data_dirs_[rand_.Uniform(data_dirs_.size())];
+string LBMCorruptor::GetRandomDataDir() const {
+  return dd_manager_->GetDirs()[rand_.Uniform(dd_manager_->GetDirs().size())];
 }
 
 Status NativeMetadataLBMCorruptor::CreateIncompleteContainer() {
@@ -488,8 +541,10 @@ Status 
NativeMetadataLBMCorruptor::CreateIncompleteContainer() {
   return Status::OK();
 }
 
-Status NativeMetadataLBMCorruptor::AppendRecord(
-    const Container* c, BlockId block_id, int64_t block_offset, int64_t 
block_length) {
+Status NativeMetadataLBMCorruptor::AppendRecord(const Container* c,
+                                                BlockId block_id,
+                                                int64_t block_offset,
+                                                int64_t block_length) {
   unique_ptr<WritablePBContainerFile> metadata_writer;
   RETURN_NOT_OK(OpenMetadataWriter(*c, &metadata_writer));
   RETURN_NOT_OK(AppendCreateRecordInternal(metadata_writer.get(), block_id,
@@ -498,8 +553,10 @@ Status NativeMetadataLBMCorruptor::AppendRecord(
   return metadata_writer->Close();
 }
 
-Status NativeMetadataLBMCorruptor::AppendCreateRecord(
-    const Container* c, BlockId block_id, uint64_t block_offset, int64_t 
block_length) {
+Status NativeMetadataLBMCorruptor::AppendCreateRecord(const Container* c,
+                                                      BlockId block_id,
+                                                      uint64_t block_offset,
+                                                      int64_t block_length) {
   unique_ptr<WritablePBContainerFile> metadata_writer;
   RETURN_NOT_OK(OpenMetadataWriter(*c, &metadata_writer));
   RETURN_NOT_OK(AppendCreateRecordInternal(metadata_writer.get(), block_id,
@@ -545,8 +602,9 @@ Status 
NativeMetadataLBMCorruptor::CreateContainerMetadataPart(
   return metadata_file->Close();
 }
 
-Status NativeMetadataLBMCorruptor::CreateMalformedRecord(
-    const Container* c, MalformedRecordType error_type, BlockRecordPB* record) 
{
+Status NativeMetadataLBMCorruptor::CreateMalformedRecord(const Container* c,
+                                                         MalformedRecordType 
error_type,
+                                                         BlockRecordPB* 
record) {
   switch (error_type) {
     case MalformedRecordType::kDeleteWithoutFirstMatchingCreate:
       record->clear_offset();
@@ -562,9 +620,10 @@ Status NativeMetadataLBMCorruptor::CreateMalformedRecord(
   return Status::OK();
 }
 
-Status NativeMetadataLBMCorruptor::AppendCreateRecordInternal(
-    WritablePBContainerFile* writer, BlockId block_id,
-    int64_t block_offset, int64_t block_length) {
+Status 
NativeMetadataLBMCorruptor::AppendCreateRecordInternal(WritablePBContainerFile* 
writer,
+                                                              BlockId block_id,
+                                                              int64_t 
block_offset,
+                                                              int64_t 
block_length) {
   BlockRecordPB record;
   block_id.CopyToPB(record.mutable_block_id());
   record.set_op_type(CREATE);
@@ -583,5 +642,98 @@ Status 
NativeMetadataLBMCorruptor::AppendDeleteRecordInternal(
   return writer->Append(record);
 }
 
+Status RdbMetadataLBMCorruptor::AppendPartialRecord(
+    const Container* c, const BlockId block_id) {
+  auto* rdb_dir = GetRdbDir(c);
+  DCHECK(rdb_dir);
+  // Add a new good record to the container.
+  RETURN_NOT_OK(AppendCreateRecordInternal(rdb_dir, c->name, block_id, 0, 0));
+  // Corrupt the record by truncating one byte off the end of it.
+  string key(LogBlockManagerRdbMeta::ConstructRocksDBKey(c->name, block_id));
+  rocksdb::Slice slice_key(key);
+  string value;
+  RETURN_NOT_OK(FromRdbStatus(rdb_dir->rdb()->Get(
+      rocksdb::ReadOptions(), slice_key, &value)));
+  return FromRdbStatus(rdb_dir->rdb()->Put(
+      rocksdb::WriteOptions(), slice_key, rocksdb::Slice(value.data(), 
value.size() - 1)));
+}
+
+Status RdbMetadataLBMCorruptor::AppendMetadataRecord(
+    const Container* c, const BlockRecordPB& record) {
+  auto* rdb_dir = GetRdbDir(c);
+  DCHECK(rdb_dir);
+  string buf;
+  record.SerializeToString(&buf);
+  string key(LogBlockManagerRdbMeta::ConstructRocksDBKey(
+      c->name, BlockId::FromPB(record.block_id())));
+  return FromRdbStatus(rdb_dir->rdb()->Put(
+      rocksdb::WriteOptions(), rocksdb::Slice(key), rocksdb::Slice(buf)));
+}
+
+RdbDir* RdbMetadataLBMCorruptor::GetRdbDir(const Container* c) const {
+  DCHECK(c);
+  for (const auto& dir : dd_manager_->dirs()) {
+    if (dir->dir() == c->dir) {
+      DCHECK(dir);
+      return down_cast<RdbDir*>(dir.get());
+    }
+  }
+  return nullptr;
+}
+
+Status RdbMetadataLBMCorruptor::AppendRecord(const Container* c,
+                                             const BlockId block_id,
+                                             int64_t block_offset,
+                                             int64_t block_length) {
+  auto* rdb_dir = GetRdbDir(c);
+  DCHECK(rdb_dir);
+  RETURN_NOT_OK(AppendCreateRecordInternal(rdb_dir, c->name, block_id, 
block_offset, block_length));
+  return AppendDeleteRecordInternal(rdb_dir, c->name, block_id);
+}
+
+Status RdbMetadataLBMCorruptor::CreateIncompleteContainer() {
+  string unsuffixed_path = JoinPathSegments(GetRandomDataDir(), 
oid_generator_.Next());
+  // Create an incomplete container. Types of incomplete containers:
+  //
+  // 0. Empty data file but no metadata file.
+  return CreateContainerDataPart(unsuffixed_path);
+}
+
+Status RdbMetadataLBMCorruptor::AppendCreateRecord(const Container* c,
+                                                   const BlockId block_id,
+                                                   uint64_t block_offset,
+                                                   int64_t block_length) {
+  auto* rdb_dir = GetRdbDir(c);
+  DCHECK(rdb_dir);
+  return AppendCreateRecordInternal(rdb_dir, c->name, block_id,
+                                    block_offset, block_length);
+}
+
+Status RdbMetadataLBMCorruptor::AppendCreateRecordInternal(RdbDir* dir,
+                                                           const string& id,
+                                                           BlockId block_id,
+                                                           int64_t 
block_offset,
+                                                           int64_t 
block_length) {
+  // Construct value.
+  BlockRecordPB record;
+  block_id.CopyToPB(record.mutable_block_id());
+  record.set_op_type(CREATE);
+  record.set_offset(block_offset);
+  record.set_length(block_length);
+  record.set_timestamp_us(0); // has no effect
+  string value;
+  record.SerializeToString(&value);
+
+  // Construct key.
+  string key(LogBlockManagerRdbMeta::ConstructRocksDBKey(id, 
BlockId::FromPB(record.block_id())));
+  return FromRdbStatus(dir->rdb()->Put({}, rocksdb::Slice(key), 
rocksdb::Slice(value)));
+}
+
+Status RdbMetadataLBMCorruptor::AppendDeleteRecordInternal(
+    RdbDir* dir, const string& id, BlockId block_id) {
+  string key(LogBlockManagerRdbMeta::ConstructRocksDBKey(id, block_id));
+  return FromRdbStatus(dir->rdb()->Delete({}, rocksdb::Slice(key)));
+}
+
 } // namespace fs
 } // namespace kudu
diff --git a/src/kudu/fs/log_block_manager-test-util.h 
b/src/kudu/fs/log_block_manager-test-util.h
index ff3dfd20b..f8661ad73 100644
--- a/src/kudu/fs/log_block_manager-test-util.h
+++ b/src/kudu/fs/log_block_manager-test-util.h
@@ -37,13 +37,14 @@ class WritablePBContainerFile;
 } // namespace pb_util
 
 namespace fs {
+class DataDirManager;
 
 // Corrupts various log block manager on-disk data structures.
 class LBMCorruptor {
  public:
   // Create a LBMCorruptor according to the --block_manager flag.
   static std::unique_ptr<LBMCorruptor> Create(
-      Env* env, std::vector<std::string> data_dirs, uint32_t rand_seed);
+      Env* env, DataDirManager* dd_manager, uint32_t rand_seed);
 
   virtual ~LBMCorruptor() = default;
 
@@ -103,8 +104,13 @@ class LBMCorruptor {
   // Injects one of the above non-fatal inconsistencies (chosen at random).
   Status InjectRandomNonFatalInconsistency();
 
+  // Resets the DataDirManager when the bound DataDirManager changes.
+  void ResetDataDirManager(DataDirManager* dd_manager) {
+    dd_manager_ = dd_manager;
+  }
+
  protected:
-  LBMCorruptor(Env* env, std::vector<std::string> data_dirs, uint32_t 
rand_seed);
+  LBMCorruptor(Env* env, DataDirManager* dd_manager, uint32_t rand_seed);
 
   // Describes an on-disk LBM container.
   struct Container {
@@ -138,17 +144,21 @@ class LBMCorruptor {
                             const Container** container) const;
 
   // Gets a data directory chosen at random.
-  const std::string& GetRandomDataDir() const;
+  std::string GetRandomDataDir() const;
 
   // Appends a CREATE-DELETE pair of records to the container 'c', the newly
   // created record has a unique id of 'block_id' and is located at
   // 'block_offset' with a size of 'block_length'.
-  virtual Status AppendRecord(const Container* c, BlockId block_id,
-                              int64_t block_offset, int64_t block_length) = 0;
+  virtual Status AppendRecord(const Container* c,
+                              BlockId block_id,
+                              int64_t block_offset,
+                              int64_t block_length) = 0;
 
   // Similar to the above, but only appends the CREATE record.
-  virtual Status AppendCreateRecord(const Container* c, BlockId block_id,
-                                    uint64_t block_offset, int64_t 
block_length) = 0;
+  virtual Status AppendCreateRecord(const Container* c,
+                                    BlockId block_id,
+                                    uint64_t block_offset,
+                                    int64_t block_length) = 0;
 
   // Appends a partial CREATE record to the metadata part of container 'c', the
   // newly created record has a unique id of 'block_id'. The record is 
corrupted
@@ -173,14 +183,16 @@ class LBMCorruptor {
     kTwoCreatesForSameBlockId
   };
   // Returns an error if a container could not be found.
-  virtual Status CreateMalformedRecord(
-      const Container* c, MalformedRecordType error_type, BlockRecordPB* 
record);
+  virtual Status CreateMalformedRecord(const Container* c,
+                                       MalformedRecordType error_type,
+                                       BlockRecordPB* record);
 
   Status CreateContainerDataPart(const std::string& unsuffixed_path);
 
   // Initialized in the constructor.
   Env* env_;
-  const std::vector<std::string> data_dirs_;
+  // Use DataDirManager to access the Dir objects conveniently.
+  DataDirManager* dd_manager_;
   mutable Random rand_;
   ObjectIdGenerator oid_generator_;
   MalformedRecordType max_malformed_types_;
diff --git a/src/kudu/fs/log_block_manager-test.cc 
b/src/kudu/fs/log_block_manager-test.cc
index 05f814761..ce7fd97ee 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -30,6 +30,7 @@
 #include <random>
 #include <set>
 #include <string>
+#include <tuple>
 #include <type_traits>
 #include <unordered_map>
 #include <unordered_set>
@@ -39,6 +40,11 @@
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
+#include <rocksdb/db.h>
+#include <rocksdb/iterator.h>
+#include <rocksdb/options.h>
+#include <rocksdb/slice.h>
+#include <rocksdb/status.h>
 
 #include "kudu/fs/block_id.h"
 #include "kudu/fs/block_manager.h"
@@ -51,6 +57,7 @@
 #include "kudu/gutil/casts.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/strip.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/strings/util.h"
@@ -69,6 +76,7 @@
 #include "kudu/util/threadpool.h"
 
 using kudu::pb_util::ReadablePBContainerFile;
+using std::make_tuple;
 using std::set;
 using std::string;
 using std::shared_ptr;
@@ -76,6 +84,8 @@ using std::unique_ptr;
 using std::unordered_map;
 using std::unordered_set;
 using std::vector;
+using strings::SkipEmpty;
+using strings::Split;
 using strings::Substitute;
 
 DECLARE_bool(cache_force_single_shard);
@@ -125,7 +135,21 @@ namespace internal {
 class LogBlockContainer;
 } // namespace internal
 
-class LogBlockManagerTest : public KuduTest, public 
::testing::WithParamInterface<bool> {
+// Parameterize test cases:
+// +------------+---------------+
+// | encryption | block_manager |
+// +------------+---------------+
+// |    no      |     log       |
+// +------------+---------------+
+// |    no      |     logr      |
+// +------------+---------------+
+// |    yes     |     log       |
+// +------------+---------------+
+// |    yes     |     logr      |
+// +------------+---------------+
+class LogBlockManagerTest : public KuduTest,
+                            public ::testing::WithParamInterface<
+                                std::tuple<bool, string>> {
  public:
   LogBlockManagerTest() :
       test_tablet_name_("test_tablet"),
@@ -134,7 +158,8 @@ class LogBlockManagerTest : public KuduTest, public 
::testing::WithParamInterfac
       //
       // Not strictly necessary except for 
TestDeleteFromContainerAfterMetadataCompaction.
       file_cache_("test_cache", env_, 50, scoped_refptr<MetricEntity>()) {
-    SetEncryptionFlags(GetParam());
+    SetEncryptionFlags(std::get<0>(GetParam()));
+    FLAGS_block_manager = std::get<1>(GetParam());
     CHECK_OK(file_cache_.Init());
     error_manager_ = make_scoped_refptr(new FsErrorManager());
     bm_ = CreateBlockManager(scoped_refptr<MetricEntity>());
@@ -162,13 +187,20 @@ class LogBlockManagerTest : public KuduTest, public 
::testing::WithParamInterfac
 
     BlockManagerOptions opts;
     opts.metric_entity = metric_entity;
-    CHECK_EQ(FLAGS_block_manager, "log");
-    return make_scoped_refptr(new LogBlockManagerNativeMeta(env_, 
dd_manager_.get(), error_manager_,
-                              &file_cache_, std::move(opts), 
fs::kDefaultTenantID));
+    if (FLAGS_block_manager == "log") {
+      return make_scoped_refptr(new LogBlockManagerNativeMeta(
+          env_, dd_manager_.get(), error_manager_,
+          &file_cache_, std::move(opts), fs::kDefaultTenantID));
+    }
+    CHECK_EQ(FLAGS_block_manager, "logr");
+    return make_scoped_refptr(new LogBlockManagerRdbMeta(
+          env_, dd_manager_.get(), error_manager_,
+          &file_cache_, std::move(opts), fs::kDefaultTenantID));
   }
 
   Status ReopenBlockManager(const scoped_refptr<MetricEntity>& metric_entity = 
nullptr,
                             FsReport* report = nullptr,
+                            LBMCorruptor* corruptor = nullptr,
                             vector<string> test_data_dirs = {},
                             bool force = false) {
     PrepareDataDirs(&test_data_dirs);
@@ -189,6 +221,10 @@ class LogBlockManagerTest : public KuduTest, public 
::testing::WithParamInterfac
                                                          
DataDirManagerOptions(), &dd_manager_));
       RETURN_NOT_OK(dd_manager_->LoadDataDirGroupFromPB(test_tablet_name_, 
test_group_pb_));
     }
+    if (corruptor) {
+      // Reset the DataDirManager which has been changed.
+      corruptor->ResetDataDirManager(dd_manager_.get());
+    }
     bm_ = CreateBlockManager(metric_entity, test_data_dirs);
     RETURN_NOT_OK(bm_->Open(report, BlockManager::MergeReport::NOT_REQUIRED, 
nullptr, nullptr));
     return Status::OK();
@@ -244,6 +280,7 @@ class LogBlockManagerTest : public KuduTest, public 
::testing::WithParamInterfac
     ASSERT_TRUE(report.malformed_record_check->entries.empty());
     ASSERT_TRUE(report.misaligned_block_check->entries.empty());
     ASSERT_TRUE(report.partial_record_check->entries.empty());
+    ASSERT_TRUE(report.corrupted_rdb_record_check->entries.empty());
   }
 
   DataDirGroupPB test_group_pb_;
@@ -343,7 +380,24 @@ static void CheckLogMetrics(const 
scoped_refptr<MetricEntity>& entity,
   }
 }
 
-INSTANTIATE_TEST_SUITE_P(EncryptionEnabled, LogBlockManagerTest, 
::testing::Values(false, true));
+INSTANTIATE_TEST_SUITE_P(EncryptionEnabled, LogBlockManagerTest,
+                         ::testing::Combine(
+                             ::testing::Values(false, true),
+                             ::testing::Values("log", "logr")));
+
+// Parameterize test cases:
+// +------------+---------------+
+// | encryption | block_manager |
+// +------------+---------------+
+// |    no      |     log       |
+// +------------+---------------+
+// |    yes     |     log       |
+// +------------+---------------+
+class LogBlockManagerNativeMetaTest : public LogBlockManagerTest {
+};
+INSTANTIATE_TEST_SUITE_P(EncryptionEnabled, LogBlockManagerNativeMetaTest,
+    ::testing::Values(make_tuple(false, "log"),
+                      make_tuple(true, "log")));
 
 TEST_P(LogBlockManagerTest, MetricsTest) {
   MetricRegistry registry;
@@ -368,7 +422,7 @@ TEST_P(LogBlockManagerTest, MetricsTest) {
   // encryption header occuppies one block on disk, so set 
FLAGS_log_container_max_size
   // (2 * fs_block_size) when the encryption is enabled and set it 
fs_block_size when
   // the encryption is disabled.
-  FLAGS_log_container_max_size = GetParam() ? 2 * fs_block_size : 
fs_block_size;
+  FLAGS_log_container_max_size = std::get<0>(GetParam()) ? 2 * fs_block_size : 
fs_block_size;
   // One block --> one container.
   unique_ptr<WritableBlock> writer;
   ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer));
@@ -667,7 +721,7 @@ TEST_P(LogBlockManagerTest, TestReuseBlockIds) {
 //
 // Note that we rely on filesystem integrity to ensure that we do not lose
 // trailing, fsync()ed metadata.
-TEST_P(LogBlockManagerTest, TestMetadataTruncation) {
+TEST_P(LogBlockManagerNativeMetaTest, TestMetadataTruncation) {
   // Create several blocks.
   vector<BlockId> created_blocks;
   BlockId last_block_id;
@@ -1054,6 +1108,7 @@ TEST_P(LogBlockManagerTest, TestParseKernelRelease) {
 // However it still can be used to micro-optimize the startup process.
 TEST_P(LogBlockManagerTest, StartupBenchmark) {
   SKIP_IF_SLOW_NOT_ALLOWED();
+
   // Disable preflushing since this can slow down our writes. In particular,
   // since we write such small blocks in this test, each block will likely
   // begin on the same 4KB page as the prior one we wrote, and due to the
@@ -1070,7 +1125,7 @@ TEST_P(LogBlockManagerTest, StartupBenchmark) {
       test_dirs.emplace_back(test_dir_ + "/" + std::to_string(i));
     }
     // Re-open block manager to place data on multiple data directories.
-    ASSERT_OK(ReopenBlockManager(nullptr, nullptr, test_dirs, /* force= */ 
true));
+    ASSERT_OK(ReopenBlockManager(nullptr, nullptr, nullptr, test_dirs, /* 
force= */ true));
   }
 
   // Creates FLAGS_startup_benchmark_batch_count_for_testing *
@@ -1126,8 +1181,9 @@ TEST_P(LogBlockManagerTest, StartupBenchmark) {
 
   for (int i = 0; i < FLAGS_startup_benchmark_reopen_times; i++) {
     SCOPED_LOG_TIMING(INFO, "reopening block manager");
-    ASSERT_OK(ReopenBlockManager(nullptr, nullptr, test_dirs));
+    ASSERT_OK(ReopenBlockManager(nullptr, nullptr, nullptr, test_dirs));
   }
+  LOG(INFO) << "Test on --block_manager=" << FLAGS_block_manager;
 }
 #endif
 
@@ -1240,7 +1296,7 @@ TEST_P(LogBlockManagerTest, 
TestContainerBlockLimitingByBlockNum) {
   NO_FATALS(AssertNumContainers(4));
 }
 
-TEST_P(LogBlockManagerTest, TestContainerBlockLimitingByMetadataSize) {
+TEST_P(LogBlockManagerNativeMetaTest, 
TestContainerBlockLimitingByMetadataSize) {
   const int kNumBlocks = 1000;
 
   // Creates 'kNumBlocks' blocks with minimal data.
@@ -1276,7 +1332,7 @@ TEST_P(LogBlockManagerTest, 
TestContainerBlockLimitingByMetadataSize) {
   NO_FATALS(AssertNumContainers(4));
 }
 
-TEST_P(LogBlockManagerTest, 
TestContainerBlockLimitingByMetadataSizeWithCompaction) {
+TEST_P(LogBlockManagerNativeMetaTest, 
TestContainerBlockLimitingByMetadataSizeWithCompaction) {
   const int kNumBlocks = 2000;
   const int kNumThreads = 10;
   const double kLiveBlockRatio = 0.1;
@@ -1287,9 +1343,9 @@ TEST_P(LogBlockManagerTest, 
TestContainerBlockLimitingByMetadataSizeWithCompacti
     // Creates 'kNumBlocks' blocks.
     for (int i = 0; i < kNumBlocks; i++) {
       unique_ptr<WritableBlock> block;
-      RETURN_NOT_OK(bm_->CreateBlock(test_block_opts_, &block));
-      RETURN_NOT_OK(block->Append("aaaa"));
-      RETURN_NOT_OK(block->Close());
+      CHECK_OK(bm_->CreateBlock(test_block_opts_, &block));
+      CHECK_OK(block->Append("aaaa"));
+      CHECK_OK(block->Close());
       ids.push_back(block->id());
     }
 
@@ -1302,9 +1358,7 @@ TEST_P(LogBlockManagerTest, 
TestContainerBlockLimitingByMetadataSizeWithCompacti
       }
       deletion_transaction->AddDeletedBlock(id);
     }
-    RETURN_NOT_OK(deletion_transaction->CommitDeletedBlocks(nullptr));
-
-    return Status::OK();
+    CHECK_OK(deletion_transaction->CommitDeletedBlocks(nullptr));
   };
 
   // Create a thread pool to create and delete blocks.
@@ -1354,7 +1408,7 @@ TEST_P(LogBlockManagerTest, 
TestContainerBlockLimitingByMetadataSizeWithCompacti
   NO_FATALS(GetContainerMetadataFiles(&metadata_files));
   bool exist_larger_one = false;
   for (const auto& metadata_file : metadata_files) {
-    uint64_t file_size;
+    uint64_t file_size = 0;
     NO_FATALS(env_->GetFileSize(metadata_file, &file_size));
     if (file_size > FLAGS_log_container_metadata_max_size *
                          
FLAGS_log_container_metadata_size_before_compact_ratio) {
@@ -1377,7 +1431,7 @@ TEST_P(LogBlockManagerTest, TestMisalignedBlocksFuzz) {
   NO_FATALS(GetOnlyContainer(&container_name));
 
   // Add a mixture of regular and misaligned blocks to it.
-  auto corruptor = LBMCorruptor::Create(env_, dd_manager_->GetDirs(), 
SeedRandom());
+  auto corruptor = LBMCorruptor::Create(env_, dd_manager_.get(), SeedRandom());
   ASSERT_OK(corruptor->Init());
   int num_misaligned_blocks = 0;
   for (int i = 0; i < kNumBlocks; i++) {
@@ -1388,7 +1442,7 @@ TEST_P(LogBlockManagerTest, TestMisalignedBlocksFuzz) {
       // container metadata writers do not expect the metadata files to have
       // been changed underneath them.
       FsReport report;
-      ASSERT_OK(ReopenBlockManager(nullptr, &report));
+      ASSERT_OK(ReopenBlockManager(nullptr, &report, corruptor.get()));
       ASSERT_FALSE(report.HasFatalErrors());
       num_misaligned_blocks++;
     } else {
@@ -1415,7 +1469,7 @@ TEST_P(LogBlockManagerTest, TestMisalignedBlocksFuzz) {
     }
   }
   FsReport report;
-  ASSERT_OK(ReopenBlockManager(nullptr, &report));
+  ASSERT_OK(ReopenBlockManager(nullptr, &report, corruptor.get()));
   ASSERT_FALSE(report.HasFatalErrors()) << report.ToString();
   ASSERT_EQ(num_misaligned_blocks, 
report.misaligned_block_check->entries.size());
   for (const auto& mb : report.misaligned_block_check->entries) {
@@ -1433,14 +1487,16 @@ TEST_P(LogBlockManagerTest, TestMisalignedBlocksFuzz) {
         deletion_transaction->AddDeletedBlock(id);
       }
     }
-    ASSERT_OK(deletion_transaction->CommitDeletedBlocks(nullptr));
+    vector<BlockId> deleted;
+    ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted));
+    ASSERT_FALSE(deleted.empty());
   }
 
   // Wait for the block manager to punch out all of the holes. It's easiest to
   // do this by reopening it; shutdown will wait for outstanding hole punches.
   //
   // On reopen, some misaligned blocks should be gone from the report.
-  ASSERT_OK(ReopenBlockManager(nullptr, &report));
+  ASSERT_OK(ReopenBlockManager(nullptr, &report, corruptor.get()));
   ASSERT_FALSE(report.HasFatalErrors());
   ASSERT_GT(report.misaligned_block_check->entries.size(), 0);
   ASSERT_LT(report.misaligned_block_check->entries.size(), 
num_misaligned_blocks);
@@ -1494,13 +1550,13 @@ TEST_P(LogBlockManagerTest, 
TestRepairPreallocateExcessSpace) {
   NO_FATALS(GetContainerNames(&container_names));
 
   // Corrupt one container.
-  auto corruptor = LBMCorruptor::Create(env_, dd_manager_->GetDirs(), 
SeedRandom());
+  auto corruptor = LBMCorruptor::Create(env_, dd_manager_.get(), SeedRandom());
   ASSERT_OK(corruptor->Init());
   ASSERT_OK(corruptor->PreallocateFullContainer());
 
   // Check the report.
   FsReport report;
-  ASSERT_OK(ReopenBlockManager(nullptr, &report));
+  ASSERT_OK(ReopenBlockManager(nullptr, &report, corruptor.get()));
   ASSERT_FALSE(report.HasFatalErrors());
   ASSERT_EQ(1, report.full_container_space_check->entries.size());
   const LBMFullContainerSpaceCheck::Entry& fcs =
@@ -1522,7 +1578,7 @@ TEST_P(LogBlockManagerTest, TestRepairUnpunchedBlocks) {
   FLAGS_log_container_excess_space_before_cleanup_fraction = 0.0;
 
   // Force our single container to become full once created.
-  FLAGS_log_container_max_size = GetParam() ? 4096 : 0;
+  FLAGS_log_container_max_size = std::get<0>(GetParam()) ? 4096 : 0;
 
   // Force the test to measure extra space in unpunched holes, not in the
   // preallocation buffer.
@@ -1538,7 +1594,7 @@ TEST_P(LogBlockManagerTest, TestRepairUnpunchedBlocks) {
   ASSERT_OK(env_->GetFileSizeOnDisk(data_file, &initial_file_size_on_disk));
 
   // Add some "unpunched blocks" to the container.
-  auto corruptor = LBMCorruptor::Create(env_, dd_manager_->GetDirs(), 
SeedRandom());
+  auto corruptor = LBMCorruptor::Create(env_, dd_manager_.get(), SeedRandom());
   ASSERT_OK(corruptor->Init());
   for (int i = 0; i < kNumBlocks; i++) {
     ASSERT_OK(corruptor->AddUnpunchedBlockToFullContainer());
@@ -1550,7 +1606,7 @@ TEST_P(LogBlockManagerTest, TestRepairUnpunchedBlocks) {
 
   // Check the report.
   FsReport report;
-  ASSERT_OK(ReopenBlockManager(nullptr, &report));
+  ASSERT_OK(ReopenBlockManager(nullptr, &report, corruptor.get()));
   ASSERT_FALSE(report.HasFatalErrors());
   ASSERT_EQ(1, report.full_container_space_check->entries.size());
   const LBMFullContainerSpaceCheck::Entry& fcs =
@@ -1566,12 +1622,23 @@ TEST_P(LogBlockManagerTest, TestRepairUnpunchedBlocks) {
   // Wait for the block manager to punch out all of the holes (done as part of
   // repair at startup). It's easiest to do this by reopening it; shutdown will
   // wait for outstanding hole punches.
-  ASSERT_OK(ReopenBlockManager(nullptr, &report));
+  ASSERT_OK(ReopenBlockManager(nullptr, &report, corruptor.get()));
+  if (FLAGS_block_manager == "logr" && !FLAGS_encrypt_data_at_rest) {
+    // In this case, the data file is too small (zero here), the container 
will be added to
+    // incomplete_container_check, of course, it will be repaired 
automatically when bootstrap.
+    ASSERT_FALSE(report.HasFatalErrors());
+    ASSERT_EQ(1, report.incomplete_container_check->entries.size());
+    const auto& ics = report.incomplete_container_check->entries[0];
+    ASSERT_EQ(container, ics.container);
+    ASSERT_TRUE(ics.repaired);
+    report.incomplete_container_check->entries.clear();
+    ASSERT_FALSE(env_->FileExists(data_file));
+  } else {
+    // File size should be 0 post-repair.
+    ASSERT_OK(env_->GetFileSizeOnDisk(data_file, &file_size_on_disk));
+    ASSERT_EQ(initial_file_size_on_disk, file_size_on_disk);
+  }
   NO_FATALS(AssertEmptyReport(report));
-
-  // File size should be 0 post-repair.
-  ASSERT_OK(env_->GetFileSizeOnDisk(data_file, &file_size_on_disk));
-  ASSERT_EQ(initial_file_size_on_disk, file_size_on_disk);
 }
 
 TEST_P(LogBlockManagerTest, TestRepairIncompleteContainer) {
@@ -1580,7 +1647,7 @@ TEST_P(LogBlockManagerTest, 
TestRepairIncompleteContainer) {
   // Create some incomplete containers. The corruptor will select between
   // several variants of "incompleteness" at random (see
   // LBMCorruptor::CreateIncompleteContainer() for details).
-  auto corruptor = LBMCorruptor::Create(env_, dd_manager_->GetDirs(), 
SeedRandom());
+  auto corruptor = LBMCorruptor::Create(env_, dd_manager_.get(), SeedRandom());
   ASSERT_OK(corruptor->Init());
   for (int i = 0; i < kNumContainers; i++) {
     ASSERT_OK(corruptor->CreateIncompleteContainer());
@@ -1591,7 +1658,7 @@ TEST_P(LogBlockManagerTest, 
TestRepairIncompleteContainer) {
 
   // Check the report.
   FsReport report;
-  ASSERT_OK(ReopenBlockManager(nullptr, &report));
+  ASSERT_OK(ReopenBlockManager(nullptr, &report, corruptor.get()));
   ASSERT_FALSE(report.HasFatalErrors());
   ASSERT_EQ(kNumContainers, report.incomplete_container_check->entries.size());
   unordered_set<string> container_name_set(container_names.begin(),
@@ -1618,7 +1685,7 @@ TEST_P(LogBlockManagerTest, TestDetectMalformedRecords) {
   // Add some malformed records. The corruptor will select between
   // several variants of "malformedness" at random (see
   // LBMCorruptor::AddMalformedRecordToContainer for details).
-  auto corruptor = LBMCorruptor::Create(env_, dd_manager_->GetDirs(), 
SeedRandom());
+  auto corruptor = LBMCorruptor::Create(env_, dd_manager_.get(), SeedRandom());
   ASSERT_OK(corruptor->Init());
   for (int i = 0; i < kNumRecords; i++) {
     ASSERT_OK(corruptor->AddMalformedRecordToContainer());
@@ -1626,7 +1693,7 @@ TEST_P(LogBlockManagerTest, TestDetectMalformedRecords) {
 
   // Check the report.
   FsReport report;
-  ASSERT_OK(ReopenBlockManager(nullptr, &report));
+  ASSERT_OK(ReopenBlockManager(nullptr, &report, corruptor.get()));
   ASSERT_TRUE(report.HasFatalErrors());
   ASSERT_EQ(kNumRecords, report.malformed_record_check->entries.size());
   for (const auto& mr : report.malformed_record_check->entries) {
@@ -1648,7 +1715,7 @@ TEST_P(LogBlockManagerTest, TestDetectMisalignedBlocks) {
   NO_FATALS(GetOnlyContainer(&container_name));
 
   // Add some misaligned blocks.
-  auto corruptor = LBMCorruptor::Create(env_, dd_manager_->GetDirs(), 
SeedRandom());
+  auto corruptor = LBMCorruptor::Create(env_, dd_manager_.get(), SeedRandom());
   ASSERT_OK(corruptor->Init());
   for (int i = 0; i < kNumBlocks; i++) {
     ASSERT_OK(corruptor->AddMisalignedBlockToContainer());
@@ -1656,7 +1723,7 @@ TEST_P(LogBlockManagerTest, TestDetectMisalignedBlocks) {
 
   // Check the report.
   FsReport report;
-  ASSERT_OK(ReopenBlockManager(nullptr, &report));
+  ASSERT_OK(ReopenBlockManager(nullptr, &report, corruptor.get()));
   ASSERT_FALSE(report.HasFatalErrors());
   ASSERT_EQ(kNumBlocks, report.misaligned_block_check->entries.size());
   uint64_t fs_block_size;
@@ -1668,7 +1735,7 @@ TEST_P(LogBlockManagerTest, TestDetectMisalignedBlocks) {
   NO_FATALS(AssertEmptyReport(report));
 }
 
-TEST_P(LogBlockManagerTest, TestRepairPartialRecords) {
+TEST_P(LogBlockManagerNativeMetaTest, TestRepairPartialRecords) {
   const int kNumContainers = 50;
   const int kNumRecords = 10;
 
@@ -1687,7 +1754,7 @@ TEST_P(LogBlockManagerTest, TestRepairPartialRecords) {
   ASSERT_EQ(kNumContainers, container_names.size());
 
   // Add some partial records.
-  auto corruptor = LBMCorruptor::Create(env_, dd_manager_->GetDirs(), 
SeedRandom());
+  auto corruptor = LBMCorruptor::Create(env_, dd_manager_.get(), SeedRandom());
   ASSERT_OK(corruptor->Init());
   for (int i = 0; i < kNumRecords; i++) {
     ASSERT_OK(corruptor->AddPartialRecordToContainer());
@@ -1695,7 +1762,7 @@ TEST_P(LogBlockManagerTest, TestRepairPartialRecords) {
 
   // Check the report.
   FsReport report;
-  ASSERT_OK(ReopenBlockManager(nullptr, &report));
+  ASSERT_OK(ReopenBlockManager(nullptr, &report, corruptor.get()));
   ASSERT_FALSE(report.HasFatalErrors());
   ASSERT_EQ(kNumRecords, report.partial_record_check->entries.size());
   unordered_set<string> container_name_set(container_names.begin(),
@@ -1745,7 +1812,7 @@ TEST_P(LogBlockManagerTest, 
TestDeleteDeadContainersAtStartup) {
   ASSERT_FALSE(env_->FileExists(metadata_file_name));
 }
 
-TEST_P(LogBlockManagerTest, TestCompactFullContainerMetadataAtStartup) {
+TEST_P(LogBlockManagerNativeMetaTest, 
TestCompactFullContainerMetadataAtStartup) {
   // With this ratio, the metadata of a full container comprised of half dead
   // blocks will be compacted at startup.
   FLAGS_log_container_live_metadata_before_compact_ratio = 0.50;
@@ -1809,7 +1876,7 @@ TEST_P(LogBlockManagerTest, 
TestCompactFullContainerMetadataAtStartup) {
 //
 // The bug was related to a stale file descriptor left in the file_cache, so
 // this test explicitly targets that scenario.
-TEST_P(LogBlockManagerTest, TestDeleteFromContainerAfterMetadataCompaction) {
+TEST_P(LogBlockManagerNativeMetaTest, 
TestDeleteFromContainerAfterMetadataCompaction) {
   // Compact aggressively.
   FLAGS_log_container_live_metadata_before_compact_ratio = 0.99;
   // Use a single shard so that we have an accurate max cache capacity
@@ -2032,7 +2099,7 @@ TEST_P(LogBlockManagerTest, 
TestDeleteDeadContainersByDeletionTransaction) {
     }
     {
       // The last block makes a full container.
-      FLAGS_log_container_max_size = GetParam() ? 4097 : 1;
+      FLAGS_log_container_max_size = std::get<0>(GetParam()) ? 4097 : 1;
       unique_ptr<WritableBlock> writer;
       ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer));
       blocks.emplace_back(writer->id());
@@ -2052,10 +2119,12 @@ TEST_P(LogBlockManagerTest, 
TestDeleteDeadContainersByDeletionTransaction) {
 
     // Check the container files.
     string data_file_name;
-    string metadata_file_name;
     NO_FATALS(GetOnlyContainerDataFile(&data_file_name));
-    NO_FATALS(GetOnlyContainerMetadataFile(&metadata_file_name));
-
+    string metadata_file_name;
+    // The metadata file exists only when --block_manager=log.
+    if (FLAGS_block_manager == "log") {
+      NO_FATALS(GetOnlyContainerMetadataFile(&metadata_file_name));
+    }
     // Open the last block for reading.
     unique_ptr<ReadableBlock> reader;
     ASSERT_OK(bm_->OpenBlock(blocks[block_num-1], &reader));
@@ -2105,7 +2174,9 @@ TEST_P(LogBlockManagerTest, 
TestDeleteDeadContainersByDeletionTransaction) {
 
     // The container files should have been deleted.
     ASSERT_FALSE(env_->FileExists(data_file_name));
-    ASSERT_FALSE(env_->FileExists(metadata_file_name));
+    if (FLAGS_block_manager == "log") {
+      ASSERT_FALSE(env_->FileExists(metadata_file_name));
+    }
   };
 
   for (int i = 1; i < 4; ++i) {
@@ -2175,7 +2246,7 @@ TEST_P(LogBlockManagerTest, 
TestDoNotDeleteFakeDeadContainer) {
   Process(false);
 }
 
-TEST_P(LogBlockManagerTest, TestHalfPresentContainer) {
+TEST_P(LogBlockManagerNativeMetaTest, TestHalfPresentContainer) {
   BlockId block_id;
   string data_file_name;
   string metadata_file_name;
@@ -2428,5 +2499,331 @@ TEST_P(LogBlockManagerTest, TestHalfPresentContainer) {
   }
 }
 
+// Parameterize test cases:
+// +------------+---------------+
+// | encryption | block_manager |
+// +------------+---------------+
+// |    no      |     logr      |
+// +------------+---------------+
+// |    yes     |     logr      |
+// +------------+---------------+
+class LogBlockManagerRdbMetaTest : public LogBlockManagerTest {
+};
+INSTANTIATE_TEST_SUITE_P(EncryptionEnabled, LogBlockManagerRdbMetaTest,
+    ::testing::Values(make_tuple(false, "logr"),
+                      make_tuple(true, "logr")));
+
+TEST_P(LogBlockManagerRdbMetaTest, TestHalfPresentContainer) {
+  BlockId block_id;
+  string data_file_name;
+  Dir* pdir = nullptr;
+  string container_name;
+  MetricRegistry registry;
+  scoped_refptr<MetricEntity> entity = 
METRIC_ENTITY_server.Instantiate(&registry, "test");
+
+  const auto CreateContainer = [&] (bool create_block = false) {
+    ASSERT_OK(ReopenBlockManager(entity));
+    unique_ptr<WritableBlock> writer;
+    ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer));
+    block_id = writer->id();
+    if (create_block) {
+      ASSERT_OK(writer->Append("a"));
+    }
+    ASSERT_OK(writer->Finalize());
+    ASSERT_OK(writer->Close());
+
+    // Get the data file name and container name.
+    NO_FATALS(GetOnlyContainerDataFile(&data_file_name));
+    pdir = dd_manager_->FindDirByFullPathForTests(data_file_name);
+    ASSERT_NE(pdir, nullptr);
+    string file_name = BaseName(data_file_name);
+    vector<string> tmp = Split(file_name, ".", SkipEmpty());
+    ASSERT_FALSE(tmp.empty());
+    container_name = tmp[0];
+  };
+
+  const auto CreateDataFile = [&] () {
+    // We're often recreating an existing file, so we must invalidate any
+    // entry in the file cache first.
+    file_cache_.Invalidate(data_file_name);
+
+    unique_ptr<WritableFile> data_file_writer;
+    WritableFileOptions opts;
+    opts.is_sensitive = true;
+    ASSERT_OK(env_->NewWritableFile(opts, data_file_name, &data_file_writer));
+    data_file_writer->Close();
+  };
+
+  const auto DeleteBlock = [&] () {
+    shared_ptr<BlockDeletionTransaction> transaction = 
bm_->NewDeletionTransaction();
+    transaction->AddDeletedBlock(block_id);
+    ASSERT_OK(transaction->CommitDeletedBlocks(nullptr));
+    transaction.reset();
+    dd_manager_->WaitOnClosures();
+  };
+
+  const auto CheckOK = [&] () {
+    FsReport report;
+    ASSERT_OK(ReopenBlockManager(entity, &report));
+    ASSERT_FALSE(report.HasFatalErrors());
+    NO_FATALS(AssertEmptyReport(report));
+    pdir = dd_manager_->FindDirByFullPathForTests(data_file_name);
+    ASSERT_NE(pdir, nullptr);
+  };
+
+  const auto CheckRepaired = [&] (const string& type) {
+    FsReport report;
+    ASSERT_OK(ReopenBlockManager(entity, &report));
+    ASSERT_FALSE(report.HasFatalErrors());
+    if (type == "incomplete_container_check") {
+      ASSERT_EQ(1, report.incomplete_container_check->entries.size());
+      report.incomplete_container_check->entries.clear();
+    } else if (type == "corrupted_rdb_record_check") {
+      ASSERT_EQ(1, report.corrupted_rdb_record_check->entries.size());
+      report.corrupted_rdb_record_check->entries.clear();
+    }
+    NO_FATALS(AssertEmptyReport(report));
+    pdir = dd_manager_->FindDirByFullPathForTests(data_file_name);
+    ASSERT_NE(pdir, nullptr);
+  };
+
+  const auto WriteBadMetadata = [&] (
+      const string& container_name, const BlockId& block_id) {
+    string key(LogBlockManagerRdbMeta::ConstructRocksDBKey(container_name, 
block_id));
+    rocksdb::Slice slice_key(key);
+    string value = "bad_value";
+    rocksdb::Status s = down_cast<RdbDir*>(pdir)->rdb()->Put(
+        rocksdb::WriteOptions(), slice_key, rocksdb::Slice(value));
+    ASSERT_OK(FromRdbStatus(s));
+  };
+
+  const auto MetadataEntriesCount = [&] (const string& container_name) -> int {
+    rocksdb::Slice begin_key = container_name;
+    int count = 0;
+    auto* rdb_dir = down_cast<RdbDir*>(pdir);
+    unique_ptr<rocksdb::Iterator> it(rdb_dir->rdb()->NewIterator(
+        rocksdb::ReadOptions(), rdb_dir->rdb()->DefaultColumnFamily()));
+    it->Seek(begin_key);
+    while (it->Valid() && it->key().starts_with(begin_key)) {
+      count++;
+      it->Next();
+    }
+    CHECK_OK(FromRdbStatus(it->status()));
+    return count;
+  };
+
+  // Case1: the metadata in rdb is empty and
+  //        the size of the existing data file is 0.
+  {
+    // Create a container.
+    NO_FATALS(CreateContainer());
+
+    // Delete the metadata part.
+    ASSERT_OK(LogBlockManagerRdbMeta::DeleteContainerMetadata(pdir, 
container_name));
+
+    // The container has been repaired.
+    NO_FATALS(CheckRepaired("incomplete_container_check"));
+
+    // Data file has been removed, metadata is empty.
+    ASSERT_FALSE(env_->FileExists(data_file_name));
+    ASSERT_EQ(0, MetadataEntriesCount(container_name));
+  }
+
+  // Case2: the metadata in rdb is empty and
+  //        the size of the existing data file is >0.
+  {
+    // Create a container.
+    NO_FATALS(CreateContainer(true));
+
+    // Delete the metadata part.
+    ASSERT_OK(LogBlockManagerRdbMeta::DeleteContainerMetadata(pdir, 
container_name));
+
+    // The container has been repaired.
+    NO_FATALS(CheckOK());
+
+    // Data file exists, but metadata is empty.
+    ASSERT_TRUE(env_->FileExists(data_file_name));
+    ASSERT_EQ(0, MetadataEntriesCount(container_name));
+
+    // Delete the data file to keep path clean.
+    ASSERT_OK(env_->DeleteFile(data_file_name));
+  }
+
+  // Case3: the metadata in rdb is empty and
+  //        the data file has gone missing.
+  {
+    // Create a container.
+    NO_FATALS(CreateContainer());
+
+    // Delete the data file&metadata part, and keep the path.
+    ASSERT_OK(env_->DeleteFile(data_file_name));
+    ASSERT_OK(LogBlockManagerRdbMeta::DeleteContainerMetadata(pdir, 
container_name));
+
+    // The container is not exist.
+    NO_FATALS(CheckOK());
+
+    // Data file has been removed, metadata is empty.
+    ASSERT_FALSE(env_->FileExists(data_file_name));
+    ASSERT_EQ(0, MetadataEntriesCount(container_name));
+  }
+
+  // Case4: the metadata in rdb has bad value and
+  //        the size of the existing data file is 0.
+  {
+    // Create a container.
+    NO_FATALS(CreateContainer());
+
+    // Delete the metadata part.
+    ASSERT_OK(LogBlockManagerRdbMeta::DeleteContainerMetadata(pdir, 
container_name));
+
+    // Create a metadata record with bad value.
+    WriteBadMetadata(container_name, block_id);
+    ASSERT_EQ(1, MetadataEntriesCount(container_name));
+
+    // The container has been repaired.
+    NO_FATALS(CheckRepaired("incomplete_container_check"));
+
+    // Data file has been removed, metadata is empty.
+    ASSERT_FALSE(env_->FileExists(data_file_name));
+    ASSERT_EQ(0, MetadataEntriesCount(container_name));
+  }
+
+  // Case5: the metadata in rdb has bad value and
+  //        the size of the existing data file is >0.
+  {
+    // Create a container.
+    NO_FATALS(CreateContainer(true));
+
+    // Delete the metadata part.
+    ASSERT_OK(LogBlockManagerRdbMeta::DeleteContainerMetadata(pdir, 
container_name));
+
+    // Create a metadata record with bad value.
+    WriteBadMetadata(container_name, block_id);
+    ASSERT_EQ(1, MetadataEntriesCount(container_name));
+
+    // The container has been repaired.
+    NO_FATALS(CheckRepaired("corrupted_rdb_record_check"));
+
+    // Data file exists, metadata is empty.
+    ASSERT_TRUE(env_->FileExists(data_file_name));
+    ASSERT_EQ(0, MetadataEntriesCount(container_name));
+  }
+
+  // Case6: the existing metadata part has no live blocks and
+  //        the data file has gone missing.
+  {
+    NO_FATALS(CreateContainer(true));
+
+    // Delete the only block.
+    NO_FATALS(DeleteBlock());
+
+    // Delete the data file.
+    ASSERT_OK(env_->DeleteFile(data_file_name));
+
+    // The container is not exist.
+    NO_FATALS(CheckOK());
+
+    // Data file has been removed, metadata is empty.
+    ASSERT_FALSE(env_->FileExists(data_file_name));
+    ASSERT_EQ(0, MetadataEntriesCount(container_name));
+  }
+
+  // Case7: the existing metadata part has no live blocks and
+  //        the size of the existing data file is 0.
+  {
+    NO_FATALS(CreateContainer(true));
+
+    // Delete the only block.
+    NO_FATALS(DeleteBlock());
+
+    // Delete the data file.
+    ASSERT_OK(env_->DeleteFile(data_file_name));
+
+    // Create an empty data file.
+    NO_FATALS(CreateDataFile());
+
+    // The container has been repaired.
+    NO_FATALS(CheckRepaired("incomplete_container_check"));
+
+    // Data file has been removed, metadata is empty.
+    ASSERT_FALSE(env_->FileExists(data_file_name));
+    ASSERT_EQ(0, MetadataEntriesCount(container_name));
+  }
+
+  // Case8: the existing metadata part has no live blocks and
+  //        the size of the existing data file is >0.
+  {
+    NO_FATALS(CreateContainer(true));
+
+    // Delete the only block.
+    NO_FATALS(DeleteBlock());
+
+    // The container is ok.
+    NO_FATALS(CheckOK());
+
+    // Data file exists, but metadata is empty.
+    ASSERT_TRUE(env_->FileExists(data_file_name));
+    ASSERT_EQ(0, MetadataEntriesCount(container_name));
+
+    // Delete the data file to keep path clean.
+    ASSERT_OK(env_->DeleteFile(data_file_name));
+  }
+
+  // TODO(yingchun): need to clear up dead metadata
+  // Case9: the existing metadata part has live blocks and
+  //        the data file has gone missing.
+  {
+    // Create a container.
+    NO_FATALS(CreateContainer(true));
+
+    // Delete the data file.
+    ASSERT_OK(env_->DeleteFile(data_file_name));
+
+    // The container is OK (in fact, it is considered as not exist, because it 
is judged according
+    // to the existence of the data file).
+    NO_FATALS(CheckOK());
+
+    // Data file has been removed, metadata has 1 record.
+    ASSERT_FALSE(env_->FileExists(data_file_name));
+    ASSERT_EQ(1, MetadataEntriesCount(container_name));
+
+    // Delete the metadata part to keep it clean.
+    ASSERT_OK(LogBlockManagerRdbMeta::DeleteContainerMetadata(pdir, 
container_name));
+  }
+
+  // Case10: the existing metadata part has live blocks and
+  //         the size of the existing data file is 0.
+  {
+    // Create a container.
+    NO_FATALS(CreateContainer(true));
+
+    // Delete the data file.
+    ASSERT_OK(env_->DeleteFile(data_file_name));
+
+    // Create an empty data file.
+    NO_FATALS(CreateDataFile());
+
+    // The container has been repaired.
+    NO_FATALS(CheckRepaired("incomplete_container_check"));
+
+    // Data file has been removed, metadata is empty.
+    ASSERT_FALSE(env_->FileExists(data_file_name));
+    ASSERT_EQ(0, MetadataEntriesCount(container_name));
+  }
+
+  // Case11: the existing metadata part has live blocks and
+  //         the size of the existing data file is >0.
+  {
+    // Create a container.
+    NO_FATALS(CreateContainer(true));
+
+    // The container is ok.
+    NO_FATALS(CheckOK());
+
+    ASSERT_TRUE(env_->FileExists(data_file_name));
+    ASSERT_EQ(1, MetadataEntriesCount(container_name));
+  }
+}
+
 } // namespace fs
 } // namespace kudu
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index d8b43e742..729393474 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -39,6 +39,12 @@
 
 #include <gflags/gflags.h>
 #include <glog/logging.h>
+#include <rocksdb/db.h>
+#include <rocksdb/iterator.h>
+#include <rocksdb/options.h>
+#include <rocksdb/slice.h>
+#include <rocksdb/status.h>
+#include <rocksdb/write_batch.h>
 
 #include "kudu/fs/block_manager_metrics.h"
 #include "kudu/fs/data_dirs.h"
@@ -51,6 +57,7 @@
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/numbers.h"
+#include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/strcat.h"
 #include "kudu/gutil/strings/strip.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -111,6 +118,12 @@ DEFINE_uint64(log_container_preallocate_bytes, 32LU * 1024 
* 1024,
               "creating new blocks. Set to 0 to disable preallocation");
 TAG_FLAG(log_container_preallocate_bytes, advanced);
 
+DEFINE_uint64(log_container_rdb_delete_batch_count, 256,
+              "The batch count for deleting blocks in one operation against 
RocksDB. It is only "
+              "effective when --block_manager='logr'");
+TAG_FLAG(log_container_rdb_delete_batch_count, experimental);
+TAG_FLAG(log_container_rdb_delete_batch_count, advanced);
+
 DEFINE_double(log_container_excess_space_before_cleanup_fraction, 0.10,
               "Additional fraction of a log container's calculated size that "
               "must be consumed on disk before the container is considered to "
@@ -206,6 +219,7 @@ METRIC_DEFINE_gauge_uint64(server, 
log_block_manager_processed_containers_startu
 using kudu::fs::internal::LogBlock;
 using kudu::fs::internal::LogBlockContainer;
 using kudu::fs::internal::LogBlockContainerNativeMeta;
+using kudu::fs::internal::LogBlockContainerRdbMeta;
 using kudu::fs::internal::LogBlockDeletionTransaction;
 using kudu::fs::internal::LogWritableBlock;
 using kudu::pb_util::ReadablePBContainerFile;
@@ -220,6 +234,8 @@ using std::unique_ptr;
 using std::unordered_map;
 using std::unordered_set;
 using std::vector;
+using strings::SkipEmpty;
+using strings::Split;
 using strings::Substitute;
 
 namespace kudu {
@@ -882,6 +898,92 @@ class LogBlockContainerNativeMeta final : public 
LogBlockContainer {
   DISALLOW_COPY_AND_ASSIGN(LogBlockContainerNativeMeta);
 };
 
+////////////////////////////////////////////////////////////
+// LogBlockContainerRdbMeta
+////////////////////////////////////////////////////////////
+
+// The metadata part is stored in a directory-wide shared RocksDB instance, 
located in
+// "<dir>/rdb/", the keys are prefixed by "<id>.", where the "id" is the 
container's ID.
+class LogBlockContainerRdbMeta final : public LogBlockContainer {
+ public:
+  // Creates a new LogBlockContainer managed by 'block_manager' in 'dir', 
'container' will be set as
+  // the newly created container.
+  //
+  // Returns Status::OK() if created successfully, otherwise returns an error.
+  static Status Create(LogBlockManager* block_manager,
+                       Dir* dir,
+                       LogBlockContainerRefPtr* container);
+
+  // Opens an existing LogBlockContainer in 'dir'.
+  //
+  // Every container is comprised of two parts: "<dir>/<id>.data" and
+  // metadata (i.e. key-value pairs in the RocksDB instance). Together,
+  // 'dir' and 'id' fully describe both parts.
+  //
+  // Returns Status::Aborted() in the case that the data part appears to have 
no
+  // data (e.g. due to a crash just after creating it but before writing any
+  // records). This is recorded in 'report'.
+  static Status Open(LogBlockManager* block_manager,
+                     Dir* dir,
+                     FsReport* report,
+                     const string& id,
+                     LogBlockContainerRefPtr* container);
+
+  // Check the container whether it is fine.
+  // Only the data part is checked, the metadata part (i.e. the RocksDB 
instance) has been checked
+  // when open the container, it would fail immediately if it's unhealthy when 
open the data
+  // directory.
+  //
+  // OK: data file of the container exist;
+  // Aborted: the container will be repaired later;
+  // NotFound: data file of the container has gone missing;
+  //
+  // Note: the status here only represents the result of check.
+  static Status CheckContainerFiles(LogBlockManager* block_manager,
+                                    FsReport* report,
+                                    const Dir* dir,
+                                    const string& common_path,
+                                    const string& data_path);
+
+  LogBlockContainerRdbMeta(LogBlockManager* block_manager,
+                           Dir* data_dir,
+                           string id,
+                           shared_ptr<RWFile> data_file,
+                           uint64_t data_file_size);
+
+  ~LogBlockContainerRdbMeta() override;
+
+  Status RemoveBlockIdsFromMetadata(const vector<LogBlockRefPtr>& lbs,
+                                    vector<BlockId>* deleted_block_ids) 
override;
+
+  Status AddBlockIdsToMetadata(const vector<LogWritableBlock*>& blocks) 
override;
+
+  Status SyncMetadata() override;
+
+  Status ProcessRecords(
+      FsReport* report,
+      LogBlockManager::UntrackedBlockMap* live_blocks,
+      LogBlockManager::BlockRecordMap* live_block_records,
+      vector<LogBlockRefPtr>* dead_blocks,
+      uint64_t* max_block_id,
+      ProcessRecordType type) override;
+
+ private:
+  void ProcessDeleteRecord(
+      BlockRecordPB* /*record*/,
+      FsReport* /*report*/,
+      LogBlockManager::UntrackedBlockMap* /*live_blocks*/,
+      LogBlockManager::BlockRecordMap* /*live_block_records*/,
+      vector<LogBlockRefPtr>* /*dead_blocks*/,
+      ProcessRecordType /*type*/) override {
+    LOG(FATAL) << Substitute("Container $0 contains a DELETE type record", 
ToString());
+  }
+
+  rocksdb::DB* const rdb_;
+
+  DISALLOW_COPY_AND_ASSIGN(LogBlockContainerRdbMeta);
+};
+
 #define CONTAINER_DISK_FAILURE(status_expr, msg) do { \
   Status s_ = (status_expr); \
   HANDLE_DISK_FAILURE(s_, 
block_manager_->error_manager_->RunErrorNotificationCb( \
@@ -986,6 +1088,7 @@ void LogBlockContainerNativeMeta::CompactMetadata() {
   report.malformed_record_check.emplace();
   report.misaligned_block_check.emplace();
   report.partial_record_check.emplace();
+  report.corrupted_rdb_record_check.emplace();
 
   LogBlockManager::UntrackedBlockMap live_blocks;
   LogBlockManager::BlockRecordMap live_block_records;
@@ -1775,6 +1878,300 @@ void LogBlockContainer::ContainerDeletionAsync(int64_t 
offset, int64_t length) {
                             data_dir()->dir()));
 }
 
+Status LogBlockContainerRdbMeta::Create(LogBlockManager* block_manager,
+                                        Dir* dir,
+                                        LogBlockContainerRefPtr* container) {
+  DCHECK(container);
+  DCHECK(!container->get());
+  string id;
+  string common_path;
+  string data_path;
+  Status metadata_status;
+  Status data_status;
+  shared_ptr<RWFile> data_file;
+
+  // Repeat in the event of a container id collision (unlikely).
+  //
+  // When looping, we delete any created-and-orphaned records.
+  do {
+    id = block_manager->oid_generator()->Next();
+    common_path = JoinPathSegments(dir->dir(), id);
+    data_path = StrCat(common_path, LogBlockManager::kContainerDataFileSuffix);
+    metadata_status = LogBlockManagerRdbMeta::DeleteContainerMetadata(dir, id);
+    WARN_NOT_OK(metadata_status, "could not delete all orphaned metadata");
+    if (PREDICT_TRUE(block_manager->file_cache_)) {
+      if (data_file) {
+        WARN_NOT_OK(block_manager->file_cache_->DeleteFile(data_path),
+                    "could not delete orphaned data file thru file cache");
+      }
+      data_status = block_manager->file_cache_->OpenFile<Env::MUST_CREATE>(
+          data_path, &data_file);
+    } else {
+      if (data_file) {
+        WARN_NOT_OK(block_manager->env()->DeleteFile(data_path),
+                    "could not delete orphaned data file");
+      }
+      unique_ptr<RWFile> rwf;
+      RWFileOptions rw_opts;
+
+      rw_opts.mode = Env::MUST_CREATE;
+      rw_opts.is_sensitive = true;
+      data_status = block_manager->env()->NewRWFile(
+          rw_opts, data_path, &rwf);
+      data_file.reset(rwf.release());
+    }
+  } while (PREDICT_FALSE(data_status.IsAlreadyPresent()));
+  if (metadata_status.ok() && data_status.ok()) {
+    container->reset(new LogBlockContainerRdbMeta(block_manager,
+                                                  dir,
+                                                  id,
+                                                  std::move(data_file),
+                                                  0 /*data_file_size*/));
+    VLOG(1) << "Created log block container " << (*container)->ToString();
+  }
+
+  // Prefer metadata status (arbitrarily).
+  auto em = block_manager->error_manager();
+  HANDLE_DISK_FAILURE(metadata_status, em->RunErrorNotificationCb(
+      ErrorHandlerType::DISK_ERROR, dir, block_manager->tenant_id()));
+  HANDLE_DISK_FAILURE(data_status, em->RunErrorNotificationCb(
+      ErrorHandlerType::DISK_ERROR, dir, block_manager->tenant_id()));
+  return !metadata_status.ok() ? metadata_status : data_status;
+}
+
+Status LogBlockContainerRdbMeta::Open(LogBlockManager* block_manager,
+                                      Dir* dir,
+                                      FsReport* report,
+                                      const string& id,
+                                      LogBlockContainerRefPtr* container) {
+  DCHECK(container);
+  DCHECK(!container->get());
+  string common_path = JoinPathSegments(dir->dir(), id);
+  string data_path = StrCat(common_path, 
LogBlockManager::kContainerDataFileSuffix);
+  RETURN_NOT_OK(CheckContainerFiles(block_manager, report, dir, common_path, 
data_path));
+
+  // Open the existing data file for writing.
+  shared_ptr<RWFile> data_file;
+  if (PREDICT_TRUE(block_manager->file_cache_)) {
+    RETURN_NOT_OK_CONTAINER_DISK_FAILURE(
+        block_manager->file_cache_->OpenFile<Env::MUST_EXIST>(data_path, 
&data_file));
+  } else {
+    RWFileOptions opts;
+    opts.mode = Env::MUST_EXIST;
+    opts.is_sensitive = true;
+    unique_ptr<RWFile> rwf;
+    RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->env()->NewRWFile(
+        opts, data_path, &rwf));
+    data_file.reset(rwf.release());
+  }
+
+  uint64_t data_file_size;
+  RETURN_NOT_OK_CONTAINER_DISK_FAILURE(data_file->Size(&data_file_size));
+
+  // Create the in-memory container and populate it.
+  LogBlockContainerRefPtr open_container(new LogBlockContainerRdbMeta(
+      block_manager, dir, id, std::move(data_file), data_file_size));
+  VLOG(1) << "Opened log block container " << open_container->ToString();
+  container->swap(open_container);
+  return Status::OK();
+}
+
+Status LogBlockContainerRdbMeta::CheckContainerFiles(LogBlockManager* 
block_manager,
+                                                     FsReport* report,
+                                                     const Dir* dir,
+                                                     const string& common_path,
+                                                     const string& data_path) {
+  Env* env = block_manager->env();
+  uint64_t data_size = 0;
+  Status s_data = env->GetFileSize(data_path, &data_size);
+  if (!s_data.ok() && !s_data.IsNotFound()) {
+    s_data = s_data.CloneAndPrepend("unable to determine data file size");
+    RETURN_NOT_OK_CONTAINER_DISK_FAILURE(s_data);
+  }
+
+  const auto kEncryptionHeaderSize = env->GetEncryptionHeaderSize();
+
+  // TODO(yingchun): In the case of data file doesn't exist but metadata 
records in RocksDB exist,
+  //                 the garbage data in RocksDB has no chance to be cleaned 
up, because the
+  //                 container is not found (since the data file is not found) 
when loading
+  //                 containers. Consider to clean up these garbage data.
+  // Check data file exists and has valid length.
+  if (PREDICT_FALSE(data_size <= kEncryptionHeaderSize)) {
+    report->incomplete_container_check->entries.emplace_back(common_path);
+    return Status::Aborted(Substitute("orphaned empty or invalid length file 
$0", common_path));
+  }
+
+  return Status::OK();
+}
+
+LogBlockContainerRdbMeta::LogBlockContainerRdbMeta(
+    LogBlockManager* block_manager,
+    Dir* data_dir,
+    string id,
+    shared_ptr<RWFile> data_file,
+    uint64_t data_file_size)
+    : LogBlockContainer(block_manager, data_dir, std::move(id),
+                        std::move(data_file), data_file_size),
+      rdb_(down_cast<RdbDir*>(data_dir)->rdb()) {
+}
+
+LogBlockContainerRdbMeta::~LogBlockContainerRdbMeta() {
+  if (dead()) {
+    CHECK(!block_manager_->opts_.read_only);
+    string data_file_name = data_file_->filename();
+    string data_failure_msg =
+        "Could not delete dead container data file " + data_file_name;
+    
CONTAINER_DISK_FAILURE(LogBlockManagerRdbMeta::DeleteContainerMetadata(data_dir_,
 id_),
+                           "Could not delete dead container metadata records " 
+ id_);
+    if (PREDICT_TRUE(block_manager_->file_cache_)) {
+      
CONTAINER_DISK_FAILURE(block_manager_->file_cache_->DeleteFile(data_file_name),
+                             data_failure_msg);
+    } else {
+      CONTAINER_DISK_FAILURE(block_manager_->env_->DeleteFile(data_file_name),
+                             data_failure_msg);
+    }
+  }
+  data_file_.reset();
+}
+
+Status LogBlockContainerRdbMeta::ProcessRecords(
+    FsReport* report,
+    LogBlockManager::UntrackedBlockMap* live_blocks,
+    LogBlockManager::BlockRecordMap* live_block_records,
+    vector<LogBlockRefPtr>* dead_blocks,
+    uint64_t* max_block_id,
+    ProcessRecordType type) {
+  int count = 0;
+  SCOPED_LOG_SLOW_EXECUTION(INFO, 100, Substitute("process $0 records in $1",
+                                                  count, data_dir_->dir()));
+  rocksdb::Slice begin_key = id_;
+  string next_id = ObjectIdGenerator::NextOf(id_);
+  rocksdb::Slice end_key = next_id;
+  rocksdb::ReadOptions scan_opt;
+  // Take advantage of Prefix-Seek, see 
https://github.com/facebook/rocksdb/wiki/Prefix-Seek.
+  scan_opt.prefix_same_as_start = true;
+  scan_opt.total_order_seek = false;
+  scan_opt.auto_prefix_mode = false;
+  scan_opt.readahead_size = 2 << 20;  // TODO(yingchun): make this 
configurable.
+  scan_opt.iterate_upper_bound = &end_key;
+
+  uint64_t data_file_size = 0;
+  BlockRecordPB record;
+  unique_ptr<rocksdb::Iterator> it(rdb_->NewIterator(scan_opt));
+  it->Seek(begin_key);
+  while (it->Valid() && it->key().starts_with(begin_key)) {
+    if (PREDICT_FALSE(!record.ParseFromArray(it->value().data(), 
it->value().size()))) {
+      report->corrupted_rdb_record_check->entries.emplace_back(ToString(), 
it->key().ToString());
+      LOG(WARNING) << Substitute("Parse metadata record in rocksdb failed, 
path: $0, key: $1",
+                                 ToString(),
+                                 it->key().ToString());
+      it->Next();
+      continue;
+    }
+
+    RETURN_NOT_OK(ProcessRecord(&record, report,
+                                live_blocks, live_block_records, dead_blocks,
+                                &data_file_size, max_block_id, type));
+    it->Next();
+    count++;
+  }
+
+  Status s = FromRdbStatus(it->status());
+  if (PREDICT_FALSE(!s.ok())) {
+    WARN_NOT_OK(s, Substitute("failed to read metadata records from RocksDB 
$0", ToString()));
+    HandleError(s);
+    return s;
+  }
+
+  return Status::OK();
+}
+
+Status LogBlockContainerRdbMeta::RemoveBlockIdsFromMetadata(
+    const vector<LogBlockRefPtr>& lbs, vector<BlockId>* deleted_block_ids) {
+  RETURN_NOT_OK_HANDLE_ERROR(read_only_status());
+  // Note: We don't check for sufficient disk space for metadata writes in
+  // order to allow for block deletion on full disks.
+  DCHECK(deleted_block_ids);
+  // Perform batch delete has a better performance than single deletes.
+  rocksdb::WriteBatch batch;
+  // The 'keys' is used to keep the lifetime of the data referenced by Slices 
in 'batch'.
+  vector<string> keys;
+  keys.reserve(lbs.size());
+  for (const auto& lb : lbs) {
+    deleted_block_ids->emplace_back(lb->block_id());
+
+    // Construct key.
+    keys.emplace_back(LogBlockManagerRdbMeta::ConstructRocksDBKey(id_, 
lb->block_id()));
+
+    // Add the delete operation to batch.
+    
RETURN_NOT_OK_HANDLE_ERROR(FromRdbStatus(batch.Delete(rocksdb::Slice(keys.back()))));
+
+    // Tune --log_container_rdb_delete_batch_count to achieve better 
performance.
+    if (batch.Count() >= FLAGS_log_container_rdb_delete_batch_count) {
+      RETURN_NOT_OK_HANDLE_ERROR(FromRdbStatus(rdb_->Write({}, &batch)));
+      batch.Clear();
+      keys.clear();
+    }
+  }
+
+  if (batch.Count() > 0) {
+    RETURN_NOT_OK_HANDLE_ERROR(FromRdbStatus(rdb_->Write({}, &batch)));
+  }
+
+  return Status::OK();
+}
+
+Status LogBlockContainerRdbMeta::AddBlockIdsToMetadata(
+    const vector<LogWritableBlock*>& blocks) {
+  RETURN_NOT_OK_HANDLE_ERROR(read_only_status());
+  BlockRecordPB record;
+  record.set_op_type(CREATE);
+  record.set_timestamp_us(GetCurrentTimeMicros());
+
+  // Perform batch put has a better performance than single puts.
+  rocksdb::WriteBatch batch;
+  // The 'keys' and 'values' are used to keep the lifetime of the data 
referenced by Slices
+  // in 'batch'.
+  vector<string> keys;
+  keys.reserve(blocks.size());
+  vector<string> values;
+  values.reserve(blocks.size());
+  for (const auto* block : blocks) {
+    // Construct key.
+    keys.emplace_back(LogBlockManagerRdbMeta::ConstructRocksDBKey(id_, 
block->id()));
+
+    // Construct value.
+    block->id().CopyToPB(record.mutable_block_id());
+    record.set_offset(block->block_offset());
+    record.set_length(block->block_length());
+    string value;
+    record.SerializeToString(&value);
+    values.emplace_back(value);
+
+    // Add the put operation to batch.
+    RETURN_NOT_OK_HANDLE_ERROR(FromRdbStatus(batch.Put(
+        rocksdb::Slice(keys.back()), rocksdb::Slice(values.back()))));
+  }
+
+  // TODO(yingchun): Add a flag similar to 
--log_container_rdb_delete_batch_count to add metadata
+  //  records in batch.
+  rocksdb::Status s = rdb_->Write({}, &batch);
+  RETURN_NOT_OK_HANDLE_ERROR(FromRdbStatus(s));
+
+  return Status::OK();
+}
+
+Status LogBlockContainerRdbMeta::SyncMetadata() {
+  VLOG(3) << "Syncing WAL of RocksDB in " << data_dir_->dir();
+  // TODO(yingchun): It's too costly to
+//  RETURN_NOT_OK_HANDLE_ERROR(read_only_status());
+//  if (FLAGS_enable_data_block_fsync) {
+//    if (metrics_) metrics_->generic_metrics.total_disk_sync->Increment();
+//    RETURN_NOT_OK_HANDLE_ERROR(FromRdbStatus(rdb_->SyncWAL()));
+//  }
+  return Status::OK();
+}
+
 ///////////////////////////////////////////////////////////
 // LogBlockCreationTransaction
 ////////////////////////////////////////////////////////////
@@ -1975,6 +2372,7 @@ struct LogBlockContainerLoadResult {
     report.malformed_record_check.emplace();
     report.misaligned_block_check.emplace();
     report.partial_record_check.emplace();
+    report.corrupted_rdb_record_check.emplace();
   }
 };
 
@@ -2986,8 +3384,17 @@ void LogBlockManager::OpenDataDir(
     Status* result_status,
     std::atomic<int>* containers_processed,
     std::atomic<int>* containers_total) {
+  Status s = dir->Prepare();
+  if (!s.ok()) {
+    HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(
+        ErrorHandlerType::DISK_ERROR, dir));
+    *result_status = s.CloneAndPrepend(Substitute(
+        "Could not initialize $0", dir->dir()));
+    return;
+  }
+
   vector<string> children;
-  Status s = env_->GetChildren(dir->dir(), &children);
+  s = env_->GetChildren(dir->dir(), &children);
   if (!s.ok()) {
     HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(
         ErrorHandlerType::DISK_ERROR, dir, tenant_id()));
@@ -2998,7 +3405,7 @@ void LogBlockManager::OpenDataDir(
 
   // Find all containers and open them.
   unordered_set<string> containers_seen;
-  results->reserve(children.size() / 2);
+  results->reserve(EstimateContainerCount(children.size()));
   for (const string& child : children) {
     string container_name;
     if (!TryStripSuffixString(
@@ -3261,7 +3668,6 @@ void LogBlockManagerNativeMeta::FindContainersToRepair(
   }
 }
 
-
 Status LogBlockManager::DoRepair(
     Dir* /*dir*/,
     FsReport* report,
@@ -3583,5 +3989,135 @@ int64_t LogBlockManager::LookupBlockLimit(int64_t 
fs_block_size) {
   return kPerFsBlockSizeBlockLimits.begin()->second;
 }
 
+Status LogBlockManagerRdbMeta::CreateContainer(Dir* dir, 
LogBlockContainerRefPtr* container) {
+  return LogBlockContainerRdbMeta::Create(this, dir, container);
+}
+
+Status LogBlockManagerRdbMeta::OpenContainer(Dir* dir,
+                                             FsReport* report,
+                                             const string& id,
+                                             LogBlockContainerRefPtr* 
container) {
+  return LogBlockContainerRdbMeta::Open(this, dir, report, id, container);
+}
+
+void LogBlockManagerRdbMeta::FindContainersToRepair(
+    FsReport* report,
+    const ContainerBlocksByName& low_live_block_containers,
+    ContainersByName* containers_by_name) {
+  LogBlockManager::FindContainersToRepair(report, low_live_block_containers, 
containers_by_name);
+  if (report->corrupted_rdb_record_check) {
+    for (const auto& e : report->corrupted_rdb_record_check->entries) {
+      LogBlockContainerRefPtr c = FindPtrOrNull(all_containers_by_name_, 
e.container);
+      if (c) {
+        (*containers_by_name)[e.container] = c;
+      }
+    }
+  }
+}
+
+Status LogBlockManagerRdbMeta::DoRepair(
+    Dir* dir,
+    FsReport* report,
+    const ContainerBlocksByName& low_live_block_containers,
+    const ContainersByName& containers_by_name) {
+  RETURN_NOT_OK(LogBlockManager::DoRepair(dir, report, 
low_live_block_containers,
+                                          containers_by_name));
+  // Delete any incomplete container files.
+  //
+  // This is a non-fatal inconsistency; we can just as easily ignore the
+  // leftover container files.
+  if (report->incomplete_container_check) {
+    for (auto& ic : report->incomplete_container_check->entries) {
+      // Delete the metadata records.
+      // The ic.container is constructed as "<dir>/<cid>", where the "cid" is 
the container's id.
+      vector<string> dir_and_cid = Split(ic.container, "/", SkipEmpty());
+      CHECK(!dir_and_cid.empty());
+      const auto &cid = dir_and_cid.back();
+      WARN_NOT_OK_LBM_DISK_FAILURE(DeleteContainerMetadata(dir, cid),
+                                   "could not delete incomplete container 
metadata records");
+      // Delete the data file.
+      Status s = env_->DeleteFile(StrCat(ic.container, 
kContainerDataFileSuffix));
+      if (!s.ok() && !s.IsNotFound()) {
+        WARN_NOT_OK_LBM_DISK_FAILURE(s, "could not delete incomplete container 
data file");
+      }
+      ic.repaired = true;
+    }
+  }
+
+  // Delete any corrupted records from RocksDB.
+  //
+  // This is a non-fatal inconsistency; we can just as easily delete the
+  // records.
+  if (report->corrupted_rdb_record_check) {
+    for (auto& e : report->corrupted_rdb_record_check->entries) {
+      // TODO(yingchun): optimize the delete operations to delete in batch.
+      rocksdb::Slice key(e.rocksdb_key);
+      auto s = down_cast<RdbDir*>(dir)->rdb()->Delete({}, key);
+      WARN_NOT_OK_LBM_DISK_FAILURE(
+          FromRdbStatus(s),
+          Substitute("could not delete container corrupted metadata record 
$0", e.rocksdb_key));
+      e.repaired = true;
+    }
+  }
+
+  return Status::OK();
+}
+
+Status LogBlockManagerRdbMeta::DeleteContainerMetadata(Dir* dir, const string& 
id) {
+  rocksdb::Slice begin_key(id);
+  string next_id = ObjectIdGenerator::NextOf(id);
+  rocksdb::Slice end_key(next_id);
+  auto* rdb = down_cast<RdbDir*>(dir)->rdb();
+  // TODO(yingchun): DeleteRange() seems not stable, it is declared as now 
usable in production.
+  //  See 
https://github.com/facebook/rocksdb/blob/v7.7.3/include/rocksdb/db.h#L475
+  // auto s = rdb->DeleteRange(
+  //     rocksdb::WriteOptions(), rdb->DefaultColumnFamily(), begin_key, 
end_key);
+  int count = 0;
+  SCOPED_LOG_SLOW_EXECUTION(INFO, 100, Substitute("delete $0 metadata records 
from $1",
+                                                  count, dir->dir()));
+  // Perform batch delete has a better performance than single deletes.
+  rocksdb::WriteBatch batch;
+  // The 'keys' is used to keep the lifetime of the data referenced by Slices 
in 'batch'.
+  vector<string> keys;
+  keys.reserve(FLAGS_log_container_rdb_delete_batch_count);
+
+  rocksdb::ReadOptions scan_opt;
+  // Take advantage of Prefix-Seek, see 
https://github.com/facebook/rocksdb/wiki/Prefix-Seek.
+  scan_opt.prefix_same_as_start = true;
+  scan_opt.total_order_seek = false;
+  scan_opt.auto_prefix_mode = false;
+  scan_opt.readahead_size = 2 << 20;  // TODO(yingchun): make this 
configurable.
+  scan_opt.iterate_upper_bound = &end_key;
+  unique_ptr<rocksdb::Iterator> it(rdb->NewIterator(scan_opt));
+  it->Seek(begin_key);
+  while (it->Valid() && it->key().starts_with(begin_key)) {
+    keys.emplace_back(it->key().ToString());
+
+    // Add the delete operation to batch.
+    RETURN_NOT_OK(FromRdbStatus(batch.Delete(rocksdb::Slice(keys.back()))));
+
+    // Tune --log_container_rdb_delete_batch_count to achieve better 
performance.
+    if (batch.Count() >= FLAGS_log_container_rdb_delete_batch_count) {
+      RETURN_NOT_OK(FromRdbStatus(rdb->Write({}, &batch)));
+      batch.Clear();
+      keys.clear();
+    }
+
+    it->Next();
+    count++;
+  }
+
+  if (batch.Count() > 0) {
+    RETURN_NOT_OK(FromRdbStatus(rdb->Write({}, &batch)));
+  }
+
+  return Status::OK();
+}
+
+std::string LogBlockManagerRdbMeta::ConstructRocksDBKey(
+    const std::string& container_id, const BlockId& block_id) {
+  return Substitute("$0.$1", container_id, block_id.ToString());
+}
+
 } // namespace fs
 } // namespace kudu
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index f2d20bb57..00d3ee849 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <atomic>
+#include <cstddef>
 #include <cstdint>
 #include <deque>
 #include <map>
@@ -58,6 +59,7 @@ namespace internal {
 class LogBlock;
 class LogBlockContainer;
 class LogBlockContainerNativeMeta;
+class LogBlockContainerRdbMeta;
 class LogBlockDeletionTransaction;
 class LogWritableBlock;
 struct LogBlockContainerLoadResult;
@@ -67,6 +69,7 @@ struct LogBlockManagerMetrics;
 typedef scoped_refptr<internal::LogBlock> LogBlockRefPtr;
 typedef scoped_refptr<internal::LogBlockContainer> LogBlockContainerRefPtr;
 typedef scoped_refptr<internal::LogBlockContainerNativeMeta> 
LogBlockContainerNativeMetaRefPtr;
+typedef scoped_refptr<internal::LogBlockContainerRdbMeta> 
LogBlockContainerRdbMetaRefPtr;
 typedef std::unordered_map<std::string, std::vector<BlockRecordPB>> 
ContainerBlocksByName;
 typedef std::unordered_map<std::string, LogBlockContainerRefPtr> 
ContainersByName;
 
@@ -215,13 +218,13 @@ class LogBlockManager : public BlockManager {
                   BlockManagerOptions opts,
                   std::string tenant_id);
 
+  FRIEND_TEST(LogBlockManagerNativeMetaTest, TestMetadataTruncation);
   FRIEND_TEST(LogBlockManagerTest, TestAbortBlock);
   FRIEND_TEST(LogBlockManagerTest, TestCloseFinalizedBlock);
   FRIEND_TEST(LogBlockManagerTest, TestCompactFullContainerMetadataAtStartup);
   FRIEND_TEST(LogBlockManagerTest, TestFinalizeBlock);
   FRIEND_TEST(LogBlockManagerTest, TestLIFOContainerSelection);
   FRIEND_TEST(LogBlockManagerTest, TestLookupBlockLimit);
-  FRIEND_TEST(LogBlockManagerTest, TestMetadataTruncation);
   FRIEND_TEST(LogBlockManagerTest, TestMisalignedBlocksFuzz);
   FRIEND_TEST(LogBlockManagerTest, TestParseKernelRelease);
   FRIEND_TEST(LogBlockManagerTest, TestBumpBlockIds);
@@ -230,9 +233,9 @@ class LogBlockManager : public BlockManager {
 
   friend class internal::LogBlockContainer;
   friend class internal::LogBlockContainerNativeMeta;
+  friend class internal::LogBlockContainerRdbMeta;
   friend class internal::LogBlockDeletionTransaction;
   friend class internal::LogWritableBlock;
-  friend class LogBlockManagerTest;
 
   // Type for the actual block map used to store all live blocks.
   // We use sparse_hash_map<> here to reduce memory overhead.
@@ -416,6 +419,9 @@ class LogBlockManager : public BlockManager {
                    std::atomic<int>* containers_processed = nullptr,
                    std::atomic<int>* containers_total = nullptr);
 
+  // Estimate the count of containers according to the count of children 
directories.
+  virtual size_t EstimateContainerCount(size_t children_count) const = 0;
+
   // Reads records from one log block container in the data directory.
   // The result details will be collected into 'result'.
   void LoadContainer(Dir* dir,
@@ -537,6 +543,8 @@ class LogBlockManager : public BlockManager {
 // metadata is simple and performant at open time.
 class LogBlockManagerNativeMeta : public LogBlockManager {
  public:
+  static constexpr const char* const name() { return "log"; }
+
   LogBlockManagerNativeMeta(Env* env,
                             scoped_refptr<DataDirManager> dd_manager,
                             scoped_refptr<FsErrorManager> error_manager,
@@ -548,6 +556,10 @@ class LogBlockManagerNativeMeta : public LogBlockManager {
   }
 
  private:
+  size_t EstimateContainerCount(size_t children_count) const override {
+    return children_count / 2;
+  }
+
   Status CreateContainer(Dir* dir, LogBlockContainerRefPtr* container) 
override;
 
   // Returns Status::Aborted() in the case that the metadata and data files
@@ -577,6 +589,84 @@ class LogBlockManagerNativeMeta : public LogBlockManager {
                   const ContainersByName& containers_by_name) override;
 };
 
+// All the container's metadata is written into a RocksDB instance which is
+// shared by all containers in the same data directory.
+// The metadata records in RocksDB are all of CREATE type, the records are
+// deleted from RocksDB when the corresponding blocks are being removed from
+// the block manager. The data in the RocksDB instance is compacted in
+// background automatically, see 
https://github.com/facebook/rocksdb/wiki/Compaction.
+//
+// Comparing with the LogBlockManagerNativeMeta, the LogBlockManagerRdbMeta 
has a
+// better performance on opening containers, the latter only contains live 
block
+// records, while the former needs to scan through all the records in the 
metadata
+// which may adversely affect the performance when the live block ratio is 
very low.
+// RocksDB provides many configuration options, we can tune the performance, 
minimize
+// the read/write and space amplification.
+class LogBlockManagerRdbMeta : public LogBlockManager {
+ public:
+  static constexpr const char* const name() { return "logr"; }
+
+  LogBlockManagerRdbMeta(Env* env,
+                         scoped_refptr<DataDirManager> dd_manager,
+                         scoped_refptr<FsErrorManager> error_manager,
+                         FileCache* file_cache,
+                         BlockManagerOptions opts,
+                         std::string tenant_id)
+      : LogBlockManager(env, std::move(dd_manager), std::move(error_manager),
+                        file_cache, std::move(opts), std::move(tenant_id)) {
+  }
+
+ private:
+  friend class internal::LogBlockContainerRdbMeta;
+  friend class RdbMetadataLBMCorruptor;
+  FRIEND_TEST(LogBlockManagerRdbMetaTest, TestHalfPresentContainer);
+
+  size_t EstimateContainerCount(size_t children_count) const override {
+    // TODO(yingchun): exclude the "rdb" directory when get the children count.
+    return children_count;
+  }
+
+  Status CreateContainer(Dir* dir, LogBlockContainerRefPtr* container) 
override;
+
+  // Returns Status::Aborted() in the case that the data part appears to have 
no
+  // data (e.g. due to a crash just after creating it but before writing any
+  // records).
+  Status OpenContainer(Dir* dir,
+                       FsReport* report,
+                       const std::string& id,
+                       LogBlockContainerRefPtr* container) override;
+
+  void FindContainersToRepair(FsReport* report,
+                              const ContainerBlocksByName& 
low_live_block_containers,
+                              ContainersByName* containers_by_name) override;
+
+  // Do the repair work.
+  //
+  // The following repairs will be performed:
+  // 1. Repairs in LogBlockManager::DoRepair().
+  // 2. Container data files in 'report->incomplete_container_check' will be 
deleted, and the
+  //    records of these containers in RocksDB will be deleted as well.
+  // 3. Container metadata records in 'report->corrupted_rdb_record_check' 
will be deleted from
+  //    RocksDB.
+  //
+  // Returns an error if repairing a fatal inconsistency failed.
+  Status DoRepair(Dir* dir,
+                  FsReport* report,
+                  const ContainerBlocksByName& low_live_block_containers,
+                  const ContainersByName& containers_by_name) override;
+
+  // Delete all the metadata part of a container.
+  //
+  // The metadata records are prefixed by the container's 'id' of the RocksDB 
instance in 'dir'.
+  static Status DeleteContainerMetadata(Dir* dir, const std::string& id);
+
+  // Construct the key used in RocksDB.
+  //
+  // The key is constructed by the container's 'container_id' and the block's 
'block_id'.
+  static std::string ConstructRocksDBKey(const std::string& container_id,
+                                         const BlockId& block_id);
+};
+
 } // namespace fs
 } // namespace kudu
 
diff --git a/src/kudu/integration-tests/CMakeLists.txt 
b/src/kudu/integration-tests/CMakeLists.txt
index 0e7ca24b5..008a1539d 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -55,7 +55,7 @@ add_dependencies(itest_util
   kudu-tserver)
 
 # Tests
-SET_KUDU_TEST_LINK_LIBS(itest_util gumbo-parser gumbo-query)
+SET_KUDU_TEST_LINK_LIBS(itest_util gumbo-parser gumbo-query rocksdb)
 ADD_KUDU_TEST(all_types-itest
   PROCESSORS 4
   NUM_SHARDS 8)
diff --git a/src/kudu/integration-tests/dense_node-itest.cc 
b/src/kudu/integration-tests/dense_node-itest.cc
index c63ce2b7e..afd849e6a 100644
--- a/src/kudu/integration-tests/dense_node-itest.cc
+++ b/src/kudu/integration-tests/dense_node-itest.cc
@@ -22,6 +22,7 @@
 #include <memory>
 #include <ostream>
 #include <string>
+#include <tuple>
 #include <utility>
 #include <vector>
 
@@ -30,6 +31,8 @@
 #include <gtest/gtest.h>
 
 #include "kudu/client/schema.h"
+#include "kudu/fs/block_manager.h"
+#include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/integral_types.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/integration-tests/cluster_itest_util.h"
@@ -75,6 +78,7 @@ using client::KuduColumnSchema;
 using client::KuduSchema;
 using client::KuduSchemaBuilder;
 using cluster::ExternalMiniClusterOptions;
+using kudu::fs::BlockManager;
 using std::pair;
 using std::string;
 using std::unique_ptr;
@@ -83,10 +87,13 @@ using strings::Substitute;
 
 class DenseNodeTest :
   public ExternalMiniClusterITestBase,
-  public testing::WithParamInterface<bool> {
+  public testing::WithParamInterface<std::tuple<bool, string>> {
 };
 
-INSTANTIATE_TEST_SUITE_P(, DenseNodeTest, testing::Values(false, true));
+INSTANTIATE_TEST_SUITE_P(, DenseNodeTest,
+                         ::testing::Combine(
+                             ::testing::Bool(),
+                             
::testing::ValuesIn(BlockManager::block_manager_types())));
 
 // Integration test that simulates "dense" Kudu nodes.
 //
@@ -152,11 +159,15 @@ TEST_P(DenseNodeTest, RunTest) {
     opts.extra_master_flags.emplace_back("--never_fsync=false");
   }
 
-  if (GetParam()) {
+  if (std::get<0>(GetParam())) {
     opts.extra_master_flags.emplace_back("--encrypt_data_at_rest=true");
     opts.extra_tserver_flags.emplace_back("--encrypt_data_at_rest=true");
   }
 
+  FLAGS_block_manager = std::get<1>(GetParam());
+  opts.extra_master_flags.emplace_back(Substitute("--block_manager=$0", 
FLAGS_block_manager));
+  opts.extra_tserver_flags.emplace_back(Substitute("--block_manager=$0", 
FLAGS_block_manager));
+
   // With the amount of data we're going to write, we need to make sure the
   // tserver has enough time to start back up (startup is only considered to be
   // "complete" when the tserver has loaded all fs metadata from disk).
@@ -199,7 +210,7 @@ TEST_P(DenseNodeTest, RunTest) {
   // metrics are logged so that they're easier to find in the log output.
   vector<pair<string, int64_t>> metrics;
   vector<GaugePrototype<uint64>*> metric_prototypes;
-  if (FLAGS_block_manager == "log") {
+  if (FsManager::IsLogType(FLAGS_block_manager)) {
     metric_prototypes = { &METRIC_log_block_manager_blocks_under_management,
                           &METRIC_log_block_manager_bytes_under_management,
                           &METRIC_log_block_manager_containers,
diff --git a/src/kudu/integration-tests/ts_recovery-itest.cc 
b/src/kudu/integration-tests/ts_recovery-itest.cc
index b74fadc84..f501fe349 100644
--- a/src/kudu/integration-tests/ts_recovery-itest.cc
+++ b/src/kudu/integration-tests/ts_recovery-itest.cc
@@ -28,6 +28,7 @@
 #include <unordered_set>
 #include <vector>
 
+#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <glog/stl_logging.h>
 #include <gtest/gtest.h>
@@ -78,6 +79,8 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+DECLARE_string(block_manager);
+
 METRIC_DECLARE_gauge_uint64(tablets_num_failed);
 
 using kudu::client::KuduClient;
@@ -123,6 +126,9 @@ int IntToKey(int i) {
 class TsRecoveryITest : public ExternalMiniClusterITestBase,
                         public ::testing::WithParamInterface<string> {
  public:
+  TsRecoveryITest() {
+    FLAGS_block_manager = GetParam();
+  }
 
  protected:
   void StartClusterOneTs(vector<string> extra_tserver_flags = {},
@@ -134,7 +140,7 @@ void TsRecoveryITest::StartClusterOneTs(vector<string> 
extra_tserver_flags,
                                         vector<string> extra_master_flags) {
   ExternalMiniClusterOptions opts;
   opts.num_tablet_servers = 1;
-  opts.block_manager_type = GetParam();
+  opts.block_manager_type = FLAGS_block_manager;
   opts.extra_tserver_flags = std::move(extra_tserver_flags);
   opts.extra_master_flags = std::move(extra_master_flags);
   StartClusterWithOpts(opts);
@@ -236,7 +242,7 @@ TEST_P(TsRecoveryITest, 
TestTabletRecoveryAfterSegmentDelete) {
 // Test for KUDU-2202 that ensures that blocks not found in the FS layer but
 // that are referenced by a tablet will not be reused.
 TEST_P(TsRecoveryITest, TestNoBlockIDReuseIfMissingBlocks) {
-  if (GetParam() != "log") {
+  if (FLAGS_block_manager != "log" && FLAGS_block_manager != "logr") {
     GTEST_SKIP() << "Missing blocks is currently only supported by the log "
                     "block manager. Exiting early!";
   }
@@ -308,7 +314,8 @@ TEST_P(TsRecoveryITest, TestNoBlockIDReuseIfMissingBlocks) {
     vector<string> children;
     ASSERT_OK(env_->GetChildren(data_dir, &children));
     for (const string& child : children) {
-      if (child != "." && child != ".." && child != 
fs::kInstanceMetadataFileName) {
+      if (child != "." && child != ".." &&
+          child != fs::kInstanceMetadataFileName && child != "rdb") {
         ASSERT_OK(env_->DeleteFile(JoinPathSegments(data_dir, child)));
       }
     }
@@ -586,7 +593,7 @@ TEST_P(TsRecoveryITestDeathTest, RecoverFromOpIdOverflow) {
     FsManagerOpts opts;
     opts.wal_root = ets->wal_dir();
     opts.data_roots = ets->data_dirs();
-    opts.block_manager_type = GetParam();
+    opts.block_manager_type = FLAGS_block_manager;
     unique_ptr<FsManager> fs_manager(new FsManager(env_, opts));
     ASSERT_OK(fs_manager->Open());
     scoped_refptr<ConsensusMetadataManager> cmeta_manager(
@@ -780,7 +787,8 @@ class UpdaterThreads {
 // these updates would be mistakenly considered as "already flushed", despite
 // the fact that they were only written to the input rowset's memory stores, 
and
 // never hit disk.
-class Kudu969Test : public TsRecoveryITest {
+class Kudu969Test : public ExternalMiniClusterITestBase,
+                    public ::testing::WithParamInterface<string> {
 };
 INSTANTIATE_TEST_SUITE_P(DifferentFaultPoints,
                          Kudu969Test,
diff --git a/src/kudu/server/CMakeLists.txt b/src/kudu/server/CMakeLists.txt
index 4ea34145e..188104e26 100644
--- a/src/kudu/server/CMakeLists.txt
+++ b/src/kudu/server/CMakeLists.txt
@@ -83,6 +83,8 @@ SET_KUDU_TEST_LINK_LIBS(
   kudu_curl_util
   mini_kdc
   server_process
-  security_test_util)
+  security_test_util
+  rocksdb
+  lz4)
 ADD_KUDU_TEST(rpc_server-test)
 ADD_KUDU_TEST(webserver-test)
diff --git a/src/kudu/tablet/compaction-test.cc 
b/src/kudu/tablet/compaction-test.cc
index 1d9abfe1b..6374bae64 100644
--- a/src/kudu/tablet/compaction-test.cc
+++ b/src/kudu/tablet/compaction-test.cc
@@ -1529,7 +1529,7 @@ TEST_F(TestCompaction, TestCompactionFreesDiskSpace) {
 // Regression test for KUDU-1237, a bug in which empty flushes or compactions
 // would result in orphaning near-empty cfile blocks on the disk.
 TEST_F(TestCompaction, TestEmptyFlushDoesntLeakBlocks) {
-  if (FLAGS_block_manager != "log") {
+  if (FLAGS_block_manager != "log" && FLAGS_block_manager != "logr") {
     GTEST_SKIP() << "Test requires the log block manager";
   }
 
diff --git a/src/kudu/tools/CMakeLists.txt b/src/kudu/tools/CMakeLists.txt
index 452ded584..c31c564fa 100644
--- a/src/kudu/tools/CMakeLists.txt
+++ b/src/kudu/tools/CMakeLists.txt
@@ -142,6 +142,7 @@ set(KUDU_CLI_TOOL_LINK_LIBS
   kudu_util
   log
   master
+  rocksdb
   tablet
   tool_proto
   transactions
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 02478c1f7..ee93845de 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -2112,6 +2112,10 @@ TEST_F(ToolTest, TestPbcTools) {
 }
 
 TEST_F(ToolTest, TestPbcToolsOnMultipleBlocks) {
+  // TODO(yingchun): `kudu rdb dump` and `kudu rdb edit` have to adapt to 
"logr" block manager.
+  if (FLAGS_block_manager == "logr") {
+    GTEST_SKIP() << "Skipping test, logr block manager is not yet supported";
+  }
   const int kNumCFileBlocks = 1024;
   const int kNumEntries = 1;
   const string kTestDir = GetTestPath("test");
@@ -4567,6 +4571,16 @@ TEST_F(ToolTest, TestLocalReplicaDelete) {
   const string& data_dir = JoinPathSegments(tserver_dir, "data");
   uint64_t size_before_delete;
   ASSERT_OK(env_->GetFileSizeOnDiskRecursively(data_dir, &size_before_delete));
+  const auto ExcludeIgnoredDirIfNeeded = [&] (uint64_t* total_size) {
+    if (FLAGS_block_manager == "logr") {
+      // Exclude the RocksDB data size.
+      uint64_t size_of_rdb;
+      ASSERT_OK(
+          env_->GetFileSizeOnDiskRecursively(JoinPathSegments(data_dir, 
"rdb"), &size_of_rdb));
+      *total_size -= size_of_rdb;
+    }
+  };
+  ExcludeIgnoredDirIfNeeded(&size_before_delete);
   NO_FATALS(RunActionStdoutNone(Substitute("local_replica delete $0 
--fs_wal_dir=$1 "
                                            "--fs_data_dirs=$1 --clean_unsafe",
                                            tablet_id, tserver_dir)));
@@ -4602,6 +4616,7 @@ TEST_F(ToolTest, TestLocalReplicaDelete) {
   // indicates that some data has been deleted from disk.
   uint64_t size_after_delete;
   ASSERT_OK(env_->GetFileSizeOnDiskRecursively(data_dir, &size_after_delete));
+  ExcludeIgnoredDirIfNeeded(&size_after_delete);
   ASSERT_LT(size_after_delete, size_before_delete);
 
   // Since there was only one tablet on the node which was deleted by tool,
diff --git a/src/kudu/tserver/tablet_server-test.cc 
b/src/kudu/tserver/tablet_server-test.cc
index 7a204a5e8..6b1229776 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -758,6 +758,11 @@ TEST_P(TabletServerDiskSpaceTest, TestFullGroupAddsDir) {
   ASSERT_TRUE(dd_manager->IsTabletInFailedDir(kTabletId));
 
   // The group should be the updated even after restarting the tablet server.
+  // When --block_manager == "logr", it's necessary to reset 'dd_manager', 
releasing
+  // the last 'dd_manager' reference.  That's to allow for releasing the 
RocksDB LOCK file.
+  // Otherwise the 'mini_server_' will fail starting later because of the 
failure to acquire
+  // the RocksDB LOCK file.  Reset it in any case, there is no side effect.
+  dd_manager.reset();
   NO_FATALS(ShutdownAndRebuildTablet(kNumDirs));
   dd_manager = mini_server_->server()->fs_manager()->dd_manager();
   ASSERT_OK(dd_manager->FindDataDirsByTabletId(kTabletId, &dir_group));
@@ -944,7 +949,7 @@ TEST_F(TabletServerStartupWebPageTest, 
TestLogBlockContainerMetrics) {
   // Validate populated metrics in case of zero containers during startup.
   ASSERT_OK(c.FetchURL(Substitute("http://$0/metrics";, addr), &buf));
   string raw = buf.ToString();
-  if (mini_server_->options()->fs_opts.block_manager_type == "log") {
+  if 
(FsManager::IsLogType(mini_server_->options()->fs_opts.block_manager_type)) {
     ASSERT_STR_MATCHES(raw, "log_block_manager_total_containers_startup\",\n[ 
]+\"value\": 0");
     ASSERT_STR_MATCHES(raw, 
"log_block_manager_processed_containers_startup\",\n[ ]+\"value\": 0");
     // Since we open each directory and read all the contents, the time taken 
might not be
@@ -3984,7 +3989,7 @@ TEST_F(TabletServerTest, TestDeleteTablet) {
     METRIC_log_block_manager_blocks_under_management.Instantiate(
         mini_server_->server()->metric_entity(), 0);
   const int block_count_before_flush = ondisk->value();
-  if (FLAGS_block_manager == "log") {
+  if (FsManager::IsLogType(FLAGS_block_manager)) {
     ASSERT_EQ(block_count_before_flush, 0);
   }
 
@@ -3995,7 +4000,7 @@ TEST_F(TabletServerTest, TestDeleteTablet) {
   NO_FATALS(InsertTestRowsRemote(2, 1));
 
   const int block_count_after_flush = ondisk->value();
-  if (FLAGS_block_manager == "log") {
+  if (FsManager::IsLogType(FLAGS_block_manager)) {
     ASSERT_GT(block_count_after_flush, block_count_before_flush);
   }
 
@@ -4034,7 +4039,7 @@ TEST_F(TabletServerTest, TestDeleteTablet) {
 
   // Verify data was actually removed.
   const int block_count_after_delete = ondisk->value();
-  if (FLAGS_block_manager == "log") {
+  if (FsManager::IsLogType(FLAGS_block_manager)) {
     ASSERT_EQ(block_count_after_delete, 0);
   }
 
diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index a921d9a68..e71e11c80 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -659,7 +659,8 @@ ADD_KUDU_TEST(compression/compression-test)
 if(NOT NO_TESTS)
   target_link_libraries(compression-test
     cfile
-    kudu_util_compression)
+    kudu_util_compression
+    rocksdb)
 endif()
 
 #######################################
@@ -679,7 +680,9 @@ if(NOT NO_TESTS)
   target_link_libraries(jwt-util-test
     mini_oidc
     server_process
-    jwt_test_certs)
+    jwt_test_certs
+    rocksdb
+    lz4)
 endif()
 
 #######################################
diff --git a/thirdparty/build-definitions.sh b/thirdparty/build-definitions.sh
index 74ede0024..9445cb170 100644
--- a/thirdparty/build-definitions.sh
+++ b/thirdparty/build-definitions.sh
@@ -1189,6 +1189,9 @@ build_rocksdb() {
     CXXFLAGS="$EXTRA_CXXFLAGS -fPIC" \
     cmake \
     -DROCKSDB_BUILD_SHARED=ON \
+    -DWITH_LZ4=ON \
+    -Dlz4_ROOT_DIR=$PREFIX \
+    -DUSE_RTTI=ON \
     -DFAIL_ON_WARNINGS=OFF \
     -DWITH_BENCHMARK_TOOLS=OFF \
     -DWITH_CORE_TOOLS=OFF \
@@ -1196,10 +1199,9 @@ build_rocksdb() {
     -DWITH_TRACE_TOOLS=OFF \
     -DWITH_JNI=OFF \
     -DWITH_LIBURING=OFF \
-    -DWITH_LZ4=ON \
     -DWITH_ZSTD=OFF \
-    -DWITH_SNAPPY=ON \
     -DWITH_BZ2=OFF \
+    -DWITH_SNAPPY=OFF \
     -DWITH_TESTS=OFF \
     -DWITH_GFLAGS=OFF \
     -DCMAKE_BUILD_TYPE=release \

Reply via email to