This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new bee128371 [multi-tenancy] add add/remove tenant api in fs layer
bee128371 is described below

commit bee1283718fda9d9d28455e62501e52f78fbf565
Author: kedeng <[email protected]>
AuthorDate: Thu Jun 29 15:23:39 2023 +0800

    [multi-tenancy] add add/remove tenant api in fs layer
    
    In this patch, my main addition is the API for adding and
    deleting tenants in the fs layer.
    
    Additionally, I adjusted the env, dd_manager, block_manager,
    and other relevant parameters to be tenant-related. For the
    default tenant, which is used for both enabled and not enabled
    multi-tenant features, the default parameters will be used.
    New tenants enabled with multi-tenant feature will possess their
    own unique environment parameters.
    
    Also, I added unit tests to ensure the validity of the newly added
    logic.
    
    However, this patch leaves the standardization work for tenant
    storage paths in multi-tenant scenarios to the next patch, where
    I will focus on implementing this part.
    
    Change-Id: I3f31d3ddd636952f8bd432330afbde018169a2a1
    Reviewed-on: http://gerrit.cloudera.org:8080/20144
    Tested-by: Kudu Jenkins
    Reviewed-by: Yingchun Lai <[email protected]>
---
 src/kudu/cfile/cfile-test.cc                 |   2 +-
 src/kudu/fs/block_manager-stress-test.cc     |  16 +-
 src/kudu/fs/block_manager-test.cc            |  22 +-
 src/kudu/fs/block_manager.h                  |  22 +-
 src/kudu/fs/data_dirs-test.cc                |   2 +-
 src/kudu/fs/data_dirs.cc                     |  18 +-
 src/kudu/fs/data_dirs.h                      |  17 +-
 src/kudu/fs/dir_manager.cc                   |   7 +-
 src/kudu/fs/dir_manager.h                    |  10 +-
 src/kudu/fs/error_manager-test.cc            |  23 +-
 src/kudu/fs/error_manager.cc                 |   9 +-
 src/kudu/fs/error_manager.h                  |  26 +-
 src/kudu/fs/file_block_manager.cc            |  51 ++--
 src/kudu/fs/file_block_manager.h             |  24 +-
 src/kudu/fs/fs_manager-test.cc               | 268 ++++++++++++++---
 src/kudu/fs/fs_manager.cc                    | 411 +++++++++++++++++++++++----
 src/kudu/fs/fs_manager.h                     | 193 +++++++++++--
 src/kudu/fs/log_block_manager-test.cc        |  36 +--
 src/kudu/fs/log_block_manager.cc             |  61 ++--
 src/kudu/fs/log_block_manager.h              |  35 ++-
 src/kudu/master/master.cc                    |   8 +-
 src/kudu/tablet/compaction-test.cc           |   2 +-
 src/kudu/tablet/compaction.cc                |   4 +-
 src/kudu/tablet/compaction.h                 |   3 +-
 src/kudu/tablet/delta_compaction.cc          |   9 +-
 src/kudu/tablet/diskrowset.cc                |   5 +-
 src/kudu/tablet/tablet_metadata.cc           |   2 +-
 src/kudu/tools/tool_action_fs.cc             |   2 +-
 src/kudu/tserver/tablet_copy_client-test.cc  |   4 +-
 src/kudu/tserver/tablet_copy_client.cc       |   2 +-
 src/kudu/tserver/tablet_copy_service-test.cc |   2 +-
 src/kudu/tserver/tablet_server-test.cc       |   2 +-
 src/kudu/tserver/tablet_server.cc            |  10 +-
 src/kudu/tserver/ts_tablet_manager.cc        |   5 +-
 src/kudu/tserver/ts_tablet_manager.h         |   2 +-
 35 files changed, 1018 insertions(+), 297 deletions(-)

diff --git a/src/kudu/cfile/cfile-test.cc b/src/kudu/cfile/cfile-test.cc
index cfb19f367..761eed50e 100644
--- a/src/kudu/cfile/cfile-test.cc
+++ b/src/kudu/cfile/cfile-test.cc
@@ -975,7 +975,7 @@ TEST_P(TestCFileBothCacheMemoryTypes, TestReleaseBlock) {
   WriterOptions opts;
   CFileWriter w(opts, GetTypeInfo(STRING), false, std::move(sink));
   ASSERT_OK(w.Start());
-  BlockManager* bm = fs_manager_->block_manager();
+  auto bm = fs_manager_->block_manager();
   unique_ptr<fs::BlockCreationTransaction> transaction = 
bm->NewCreationTransaction();
   ASSERT_OK(w.FinishAndReleaseBlock(transaction.get()));
   ASSERT_OK(transaction->CommitCreatedBlocks());
diff --git a/src/kudu/fs/block_manager-stress-test.cc 
b/src/kudu/fs/block_manager-stress-test.cc
index f8f042836..4dd49af5c 100644
--- a/src/kudu/fs/block_manager-stress-test.cc
+++ b/src/kudu/fs/block_manager-stress-test.cc
@@ -36,6 +36,7 @@
 #include "kudu/fs/block_id.h"
 #include "kudu/fs/block_manager.h"
 #include "kudu/fs/data_dirs.h"
+#include "kudu/fs/dir_manager.h"
 #include "kudu/fs/error_manager.h"
 #include "kudu/fs/file_block_manager.h" // IWYU pragma: keep
 #include "kudu/fs/fs.pb.h"
@@ -149,7 +150,7 @@ class BlockManagerStressTest : public KuduTest {
     // Defer block manager creation until after the above flags are set.
     bm_.reset(CreateBlockManager());
     CHECK_OK(file_cache_.Init());
-    CHECK_OK(bm_->Open(nullptr, nullptr, nullptr));
+    CHECK_OK(bm_->Open(nullptr, BlockManager::MergeReport::NOT_REQUIRED, 
nullptr, nullptr));
     CHECK_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
     CHECK_OK(dd_manager_->GetDataDirGroupPB(test_tablet_name_, 
&test_group_pb_));
   }
@@ -186,8 +187,9 @@ class BlockManagerStressTest : public KuduTest {
       CHECK_OK(DataDirManager::OpenExistingForTests(env_, data_dirs,
           DataDirManagerOptions(), &dd_manager_));
     }
-    return new T(env_, dd_manager_.get(), &error_manager_,
-                 &file_cache_, BlockManagerOptions());
+    error_manager_ = new FsErrorManager();
+    return new T(env_, dd_manager_.get(), error_manager_,
+                 &file_cache_, BlockManagerOptions(), fs::kDefaultTenantName);
   }
 
   void RunTest(double secs) {
@@ -254,13 +256,13 @@ class BlockManagerStressTest : public KuduTest {
   // Protects written_blocks_.
   simple_spinlock lock_;
 
-  unique_ptr<DataDirManager> dd_manager_;
+  scoped_refptr<DataDirManager> dd_manager_;
 
-  FsErrorManager error_manager_;
+  scoped_refptr<FsErrorManager> error_manager_;
 
   FileCache file_cache_;
 
-  unique_ptr<BlockManager> bm_;
+  scoped_refptr<BlockManager> bm_;
 
   // Test group of disk to spread data across.
   DataDirGroupPB test_group_pb_;
@@ -542,7 +544,7 @@ TYPED_TEST(BlockManagerStressTest, StressTest) {
     LOG(INFO) << "Running on populated block manager (restart #" << i << ")";
     this->bm_.reset(this->CreateBlockManager());
     FsReport report;
-    ASSERT_OK(this->bm_->Open(&report, nullptr, nullptr));
+    ASSERT_OK(this->bm_->Open(&report, 
BlockManager::MergeReport::NOT_REQUIRED, nullptr, nullptr));
     
ASSERT_OK(this->dd_manager_->LoadDataDirGroupFromPB(this->test_tablet_name_,
                                                         this->test_group_pb_));
     ASSERT_OK(report.LogAndCheckForFatalErrors());
diff --git a/src/kudu/fs/block_manager-test.cc 
b/src/kudu/fs/block_manager-test.cc
index 90dbe43e6..1daac8cf2 100644
--- a/src/kudu/fs/block_manager-test.cc
+++ b/src/kudu/fs/block_manager-test.cc
@@ -37,6 +37,7 @@
 
 #include "kudu/fs/block_id.h"
 #include "kudu/fs/data_dirs.h"
+#include "kudu/fs/dir_manager.h"
 #include "kudu/fs/error_manager.h"
 #include "kudu/fs/file_block_manager.h"
 #include "kudu/fs/fs.pb.h"
@@ -130,7 +131,7 @@ class BlockManagerTest : public KuduTest {
   void SetUp() override {
     // Pass in a report to prevent the block manager from logging 
unnecessarily.
     FsReport report;
-    ASSERT_OK(bm_->Open(&report, nullptr, nullptr));
+    ASSERT_OK(bm_->Open(&report, BlockManager::MergeReport::NOT_REQUIRED, 
nullptr, nullptr));
     ASSERT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
     ASSERT_OK(dd_manager_->GetDataDirGroupPB(test_tablet_name_, 
&test_group_pb_));
   }
@@ -168,8 +169,9 @@ class BlockManagerTest : public KuduTest {
     BlockManagerOptions opts;
     opts.metric_entity = metric_entity;
     opts.parent_mem_tracker = parent_mem_tracker;
-    return new T(env_, this->dd_manager_.get(), &error_manager_,
-                 &file_cache_, std::move(opts));
+    error_manager_ = new FsErrorManager();
+    return new T(env_, this->dd_manager_.get(), error_manager_,
+                 &file_cache_, std::move(opts), fs::kDefaultTenantName);
   }
 
   Status ReopenBlockManager(const scoped_refptr<MetricEntity>& metric_entity,
@@ -191,7 +193,7 @@ class BlockManagerTest : public KuduTest {
           env_, paths, opts, &dd_manager_));
     }
     bm_.reset(CreateBlockManager(metric_entity, parent_mem_tracker));
-    RETURN_NOT_OK(bm_->Open(nullptr, nullptr, nullptr));
+    RETURN_NOT_OK(bm_->Open(nullptr, BlockManager::MergeReport::NOT_REQUIRED, 
nullptr, nullptr));
 
     // Certain tests may maintain their own directory groups, in which case
     // the default test group should not be used.
@@ -238,10 +240,10 @@ class BlockManagerTest : public KuduTest {
   DataDirGroupPB test_group_pb_;
   string test_tablet_name_;
   CreateBlockOptions test_block_opts_;
-  FsErrorManager error_manager_;
-  unique_ptr<DataDirManager> dd_manager_;
+  scoped_refptr<FsErrorManager> error_manager_;
+  scoped_refptr<DataDirManager> dd_manager_;
   FileCache file_cache_;
-  unique_ptr<T> bm_;
+  scoped_refptr<T> bm_;
 };
 
 template <>
@@ -249,7 +251,7 @@ void BlockManagerTest<LogBlockManagerNativeMeta>::SetUp() {
   RETURN_NOT_LOG_BLOCK_MANAGER();
   // Pass in a report to prevent the block manager from logging unnecessarily.
   FsReport report;
-  ASSERT_OK(bm_->Open(&report, nullptr, nullptr));
+  ASSERT_OK(bm_->Open(&report, BlockManager::MergeReport::NOT_REQUIRED, 
nullptr, nullptr));
   ASSERT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
 
   // Store the DataDirGroupPB for tests that reopen the block manager.
@@ -748,10 +750,10 @@ TYPED_TEST(BlockManagerTest, PersistenceTest) {
   // The existing block manager is left open, which proxies for the process
   // having crashed without cleanly shutting down the block manager. The
   // on-disk metadata should still be clean.
-  unique_ptr<BlockManager> new_bm(this->CreateBlockManager(
+  scoped_refptr<BlockManager> new_bm(this->CreateBlockManager(
       scoped_refptr<MetricEntity>(),
       MemTracker::CreateTracker(-1, "other tracker")));
-  ASSERT_OK(new_bm->Open(nullptr, nullptr, nullptr));
+  ASSERT_OK(new_bm->Open(nullptr, BlockManager::MergeReport::NOT_REQUIRED, 
nullptr, nullptr));
 
   // Test that the state of all three blocks is properly reflected.
   unique_ptr<ReadableBlock> read_block;
diff --git a/src/kudu/fs/block_manager.h b/src/kudu/fs/block_manager.h
index aad77adb9..b33e47a65 100644
--- a/src/kudu/fs/block_manager.h
+++ b/src/kudu/fs/block_manager.h
@@ -195,7 +195,7 @@ struct BlockManagerOptions {
 
 // Utilities for Kudu block lifecycle management. All methods are
 // thread-safe.
-class BlockManager {
+class BlockManager : public RefCountedThreadSafe<BlockManager> {
  public:
   // Lists the available block manager types.
   static std::vector<std::string> block_manager_types() {
@@ -208,6 +208,15 @@ class BlockManager {
 
   virtual ~BlockManager() {}
 
+  enum class MergeReport {
+    NOT_REQUIRED,    ///< Do no fs report merge operation when open block 
manager.
+                     ///< The report only records the data of this open 
operation.
+
+    REQUIRED         ///< Do fs report merge operation when open block manager 
if report ptr
+                     ///  is not null.
+                     ///< The report updates the data of this operation to the 
incoming data.
+  };
+
   // Opens an existing on-disk representation of this block manager and
   // checks it for inconsistencies. If found, and if the block manager was not
   // constructed in read-only mode, an attempt will be made to repair them.
@@ -223,7 +232,8 @@ class BlockManager {
   // If 'containers_processed' and 'containers_total' are not nullptr, they 
will
   // be populated with total containers attempted to be opened/processed and
   // total containers present respectively.
-  virtual Status Open(FsReport* report, std::atomic<int>* containers_processed,
+  virtual Status Open(FsReport* report, MergeReport need_merage,
+                      std::atomic<int>* containers_processed,
                       std::atomic<int>* containers_total) = 0;
 
   // Creates a new block using the provided options and opens it for
@@ -273,7 +283,13 @@ class BlockManager {
   virtual void NotifyBlockId(BlockId block_id) = 0;
 
   // Exposes the FsErrorManager used to handle fs errors.
-  virtual FsErrorManager* error_manager() = 0;
+  virtual scoped_refptr<FsErrorManager> error_manager() = 0;
+
+  // Exposes the tenant id.
+  virtual std::string tenant_id() const = 0;
+
+ private:
+  friend class RefCountedThreadSafe<BlockManager>;
 };
 
 // Group a set of block creations together in a transaction. This has two
diff --git a/src/kudu/fs/data_dirs-test.cc b/src/kudu/fs/data_dirs-test.cc
index 3584b71c1..9d333d217 100644
--- a/src/kudu/fs/data_dirs-test.cc
+++ b/src/kudu/fs/data_dirs-test.cc
@@ -110,7 +110,7 @@ class DataDirsTest : public KuduTest {
   const CreateBlockOptions test_block_opts_;
   MetricRegistry registry_;
   scoped_refptr<MetricEntity> entity_;
-  std::unique_ptr<DataDirManager> dd_manager_;
+  scoped_refptr<DataDirManager> dd_manager_;
 };
 
 TEST_F(DataDirsTest, TestCreateGroup) {
diff --git a/src/kudu/fs/data_dirs.cc b/src/kudu/fs/data_dirs.cc
index b5dc54c00..c8b4c8960 100644
--- a/src/kudu/fs/data_dirs.cc
+++ b/src/kudu/fs/data_dirs.cc
@@ -222,7 +222,7 @@ int DataDir::reserved_bytes() const {
 ////////////////////////////////////////////////////////////
 
 DataDirManagerOptions::DataDirManagerOptions()
-    : DirManagerOptions(FLAGS_block_manager) {}
+    : DirManagerOptions(FLAGS_block_manager, fs::kDefaultTenantID) {}
 
 DataDirManager::DataDirManager(Env* env,
                                const DataDirManagerOptions& opts,
@@ -233,9 +233,9 @@ DataDirManager::DataDirManager(Env* env,
                  opts, std::move(canonicalized_data_roots)) {}
 
 Status DataDirManager::OpenExistingForTests(Env* env,
-                                            vector<string> data_fs_roots,
+                                            const vector<string>& 
data_fs_roots,
                                             const DataDirManagerOptions& opts,
-                                            unique_ptr<DataDirManager>* 
dd_manager) {
+                                            
scoped_refptr<kudu::fs::DataDirManager>* dd_manager) {
   CanonicalizedRootsList roots;
   for (const auto& r : data_fs_roots) {
     roots.push_back({ r, Status::OK() });
@@ -245,17 +245,17 @@ Status DataDirManager::OpenExistingForTests(Env* env,
 
 Status DataDirManager::OpenExisting(Env* env, CanonicalizedRootsList 
data_fs_roots,
                                     const DataDirManagerOptions& opts,
-                                    unique_ptr<DataDirManager>* dd_manager) {
-  unique_ptr<DataDirManager> dm;
+                                    scoped_refptr<kudu::fs::DataDirManager>* 
dd_manager) {
+  scoped_refptr<DataDirManager> dm;
   dm.reset(new DataDirManager(env, opts, std::move(data_fs_roots)));
   RETURN_NOT_OK(dm->Open());
   dd_manager->swap(dm);
   return Status::OK();
 }
 
-Status DataDirManager::CreateNewForTests(Env* env, vector<string> 
data_fs_roots,
+Status DataDirManager::CreateNewForTests(Env* env, const vector<string>& 
data_fs_roots,
                                          const DataDirManagerOptions& opts,
-                                         unique_ptr<DataDirManager>* 
dd_manager) {
+                                         
scoped_refptr<kudu::fs::DataDirManager>* dd_manager) {
   CanonicalizedRootsList roots;
   for (const auto& r : data_fs_roots) {
     roots.push_back({ r, Status::OK() });
@@ -265,8 +265,8 @@ Status DataDirManager::CreateNewForTests(Env* env, 
vector<string> data_fs_roots,
 
 Status DataDirManager::CreateNew(Env* env, CanonicalizedRootsList 
data_fs_roots,
                                  const DataDirManagerOptions& opts,
-                                 unique_ptr<DataDirManager>* dd_manager) {
-  unique_ptr<DataDirManager> dm;
+                                 scoped_refptr<kudu::fs::DataDirManager>* 
dd_manager) {
+  scoped_refptr<DataDirManager> dm;
   dm.reset(new DataDirManager(env, opts, std::move(data_fs_roots)));
   RETURN_NOT_OK(dm->Create());
   RETURN_NOT_OK(dm->Open());
diff --git a/src/kudu/fs/data_dirs.h b/src/kudu/fs/data_dirs.h
index c3083720d..d6f22d66b 100644
--- a/src/kudu/fs/data_dirs.h
+++ b/src/kudu/fs/data_dirs.h
@@ -106,7 +106,8 @@ struct DataDirManagerOptions : public DirManagerOptions {
 
 // Encapsulates knowledge of data directory management on behalf of block
 // managers.
-class DataDirManager : public DirManager {
+class DataDirManager : public DirManager,
+                       public RefCountedThreadSafe<DataDirManager> {
  public:
   enum class DirDistributionMode {
     ACROSS_ALL_DIRS,
@@ -116,20 +117,20 @@ class DataDirManager : public DirManager {
   // Public static initializers for use in tests. When used, data_fs_roots is
   // expected to be the successfully canonicalized directories.
   static Status CreateNewForTests(Env* env,
-                                  std::vector<std::string> data_fs_roots,
+                                  const std::vector<std::string>& 
data_fs_roots,
                                   const DataDirManagerOptions& opts,
-                                  std::unique_ptr<DataDirManager>* dd_manager);
+                                  scoped_refptr<DataDirManager>* dd_manager);
   static Status OpenExistingForTests(Env* env,
-                                     std::vector<std::string> data_fs_roots,
+                                     const std::vector<std::string>& 
data_fs_roots,
                                      const DataDirManagerOptions& opts,
-                                     std::unique_ptr<DataDirManager>* 
dd_manager);
+                                     scoped_refptr<DataDirManager>* 
dd_manager);
 
   // Constructs a directory manager and creates its necessary files on-disk.
   //
   // Returns an error if any of the directories already exist.
   static Status CreateNew(Env* env, CanonicalizedRootsList data_fs_roots,
                           const DataDirManagerOptions& opts,
-                          std::unique_ptr<DataDirManager>* dd_manager);
+                          scoped_refptr<DataDirManager>* dd_manager);
 
   // Constructs a directory manager and indexes the files found on-disk.
   //
@@ -137,7 +138,7 @@ class DataDirManager : public DirManager {
   // max allowed, or if locks need to be acquired and cannot be.
   static Status OpenExisting(Env* env, CanonicalizedRootsList data_fs_roots,
                              const DataDirManagerOptions& opts,
-                             std::unique_ptr<DataDirManager>* dd_manager);
+                             scoped_refptr<DataDirManager>* dd_manager);
 
   // Deserializes a DataDirGroupPB and associates the resulting DataDirGroup
   // with a tablet_id.
@@ -196,6 +197,8 @@ class DataDirManager : public DirManager {
   FRIEND_TEST(DataDirsTest, TestLoadBalancingBias);
   FRIEND_TEST(DataDirsTest, TestLoadBalancingDistribution);
   FRIEND_TEST(DataDirsTest, TestFailedDirNotAddedToGroup);
+  friend class RefCountedThreadSafe<DataDirManager>;
+  ~DataDirManager() override {}
 
   // Populates the maps to index the given directories.
   Status PopulateDirectoryMaps(const std::vector<std::unique_ptr<Dir>>& dirs) 
override;
diff --git a/src/kudu/fs/dir_manager.cc b/src/kudu/fs/dir_manager.cc
index 61f3e92f5..94d7c945b 100644
--- a/src/kudu/fs/dir_manager.cc
+++ b/src/kudu/fs/dir_manager.cc
@@ -159,8 +159,11 @@ Status Dir::RefreshAvailableSpace(RefreshMode mode) {
   return Status::OK();
 }
 
-DirManagerOptions::DirManagerOptions(const string& dir_type)
-    : dir_type(dir_type), read_only(false),
+DirManagerOptions::DirManagerOptions(string dir_type,
+                                     string tid)
+    : dir_type(std::move(dir_type)),
+      tenant_id(std::move(tid)),
+      read_only(false),
       update_instances(UpdateInstanceBehavior::UPDATE_AND_IGNORE_FAILURES) {}
 
 
diff --git a/src/kudu/fs/dir_manager.h b/src/kudu/fs/dir_manager.h
index b7a54fe40..4502740b5 100644
--- a/src/kudu/fs/dir_manager.h
+++ b/src/kudu/fs/dir_manager.h
@@ -187,6 +187,11 @@ struct DirManagerOptions {
   // Must not be empty.
   std::string dir_type;
 
+  // The id used to distinguish different tenants.
+  //
+  // Must not be empty.
+  std::string tenant_id;
+
   // The entity under which all metrics should be grouped. If null, metrics
   // will not be produced.
   //
@@ -205,7 +210,8 @@ struct DirManagerOptions {
   UpdateInstanceBehavior update_instances;
 
  protected:
-  explicit DirManagerOptions(const std::string& dir_type);
+  explicit DirManagerOptions(std::string dir_type,
+                             std::string tid);
 };
 
 class DirManager {
@@ -287,6 +293,8 @@ class DirManager {
   // the given UUID index.
   std::set<std::string> FindTabletsByDirUuidIdx(int uuid_idx) const;
 
+  const std::string tenant_id() { return opts_.tenant_id; }
+
   // Create a new directory using the appropriate directory implementation.
   virtual std::unique_ptr<Dir> CreateNewDir(Env* env,
                                             DirMetrics* metrics,
diff --git a/src/kudu/fs/error_manager-test.cc 
b/src/kudu/fs/error_manager-test.cc
index 538d3dbe4..26e6c4b69 100644
--- a/src/kudu/fs/error_manager-test.cc
+++ b/src/kudu/fs/error_manager-test.cc
@@ -31,6 +31,7 @@
 #include <gtest/gtest.h>
 
 #include "kudu/gutil/map-util.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/threading/thread_collision_warner.h"
 #include "kudu/util/monotime.h"
@@ -86,7 +87,7 @@ class FsErrorManagerTest : public KuduTest {
   // time, it is likely that some will write to the same entry.
   //
   // NOTE: this can be curried into an ErrorNotificationCb.
-  void SleepAndWriteFirstEmptyCb(int i, const string& /* s */) {
+  void SleepAndWriteFirstEmptyCb(int i, const string& /* s */, const string& 
/* u */) {
     DFAKE_SCOPED_LOCK(fake_lock_);
     int first_available = FindFirst(-1);
     SleepForRand();
@@ -107,14 +108,14 @@ class FsErrorManagerTest : public KuduTest {
     return positions;
   }
 
-  FsErrorManager* em() const { return em_.get(); }
+  scoped_refptr<FsErrorManager> em() const { return em_; }
 
  protected:
   // The single vector that the error notification callbacks will all write to.
   vector<int> test_vec_;
 
  private:
-  unique_ptr<FsErrorManager> em_;
+  scoped_refptr<FsErrorManager> em_;
 
   // Fake lock used to ensure threads don't run error-handling at the same 
time.
   DFAKE_MUTEX(fake_lock_);
@@ -130,8 +131,8 @@ TEST_F(FsErrorManagerTest, TestBasicRegistration) {
   // Register a callback to update the first '-1' entry in test_vec_ to '0'
   // after waiting a random amount of time.
   em()->SetErrorNotificationCb(
-      ErrorHandlerType::DISK_ERROR, [this](const string& uuid) {
-        this->SleepAndWriteFirstEmptyCb(ErrorHandlerType::DISK_ERROR, uuid);
+      ErrorHandlerType::DISK_ERROR, [this](const string& uuid, const string& 
tenant_id) {
+        this->SleepAndWriteFirstEmptyCb(ErrorHandlerType::DISK_ERROR, uuid, 
tenant_id);
       });
   em()->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, "");
   ASSERT_EQ(0, FindFirst(ErrorHandlerType::DISK_ERROR));
@@ -143,8 +144,8 @@ TEST_F(FsErrorManagerTest, TestBasicRegistration) {
 
   // Now register another callback.
   em()->SetErrorNotificationCb(
-      ErrorHandlerType::NO_AVAILABLE_DISKS, [this](const string& uuid) {
-        this->SleepAndWriteFirstEmptyCb(ErrorHandlerType::NO_AVAILABLE_DISKS, 
uuid);
+      ErrorHandlerType::NO_AVAILABLE_DISKS, [this](const string& uuid, const 
string& tenant_id) {
+        this->SleepAndWriteFirstEmptyCb(ErrorHandlerType::NO_AVAILABLE_DISKS, 
uuid, tenant_id);
       });
   em()->RunErrorNotificationCb(ErrorHandlerType::NO_AVAILABLE_DISKS, "");
   ASSERT_EQ(1, FindFirst(ErrorHandlerType::NO_AVAILABLE_DISKS));
@@ -165,12 +166,12 @@ TEST_F(FsErrorManagerTest, TestBasicRegistration) {
 // Test that the callbacks get run serially.
 TEST_F(FsErrorManagerTest, TestSerialization) {
   em()->SetErrorNotificationCb(
-      ErrorHandlerType::DISK_ERROR, [this](const string& uuid) {
-        this->SleepAndWriteFirstEmptyCb(ErrorHandlerType::DISK_ERROR, uuid);
+      ErrorHandlerType::DISK_ERROR, [this](const string& uuid, const string& 
tenant_id) {
+        this->SleepAndWriteFirstEmptyCb(ErrorHandlerType::DISK_ERROR, uuid, 
tenant_id);
       });
   em()->SetErrorNotificationCb(
-      ErrorHandlerType::NO_AVAILABLE_DISKS, [this](const string& uuid) {
-        this->SleepAndWriteFirstEmptyCb(ErrorHandlerType::NO_AVAILABLE_DISKS, 
uuid);
+      ErrorHandlerType::NO_AVAILABLE_DISKS, [this](const string& uuid, const 
string& tenant_id) {
+        this->SleepAndWriteFirstEmptyCb(ErrorHandlerType::NO_AVAILABLE_DISKS, 
uuid, tenant_id);
       });
 
   // Swap back and forth between error-handler type.
diff --git a/src/kudu/fs/error_manager.cc b/src/kudu/fs/error_manager.cc
index 55b78dbdc..b789f8154 100644
--- a/src/kudu/fs/error_manager.cc
+++ b/src/kudu/fs/error_manager.cc
@@ -29,7 +29,8 @@ namespace kudu {
 namespace fs {
 
 // Default error-handling callback that no-ops.
-static void DoNothingErrorNotification(const string& /* uuid */) {}
+static void DoNothingErrorNotification(const string& /* uuid */,
+                                       const string& /* tenant_id */) {}
 
 FsErrorManager::FsErrorManager() {
   InsertOrDie(&callbacks_, ErrorHandlerType::DISK_ERROR, 
&DoNothingErrorNotification);
@@ -47,9 +48,11 @@ void 
FsErrorManager::UnsetErrorNotificationCb(ErrorHandlerType e) {
   EmplaceOrUpdate(&callbacks_, e, &DoNothingErrorNotification);
 }
 
-void FsErrorManager::RunErrorNotificationCb(ErrorHandlerType e, const string& 
uuid) const {
+void FsErrorManager::RunErrorNotificationCb(ErrorHandlerType e,
+                                            const string& uuid,
+                                            const string& tenant_id) const {
   std::lock_guard<Mutex> l(lock_);
-  FindOrDie(callbacks_, e)(uuid);
+  FindOrDie(callbacks_, e)(uuid, tenant_id);
 }
 
 }  // namespace fs
diff --git a/src/kudu/fs/error_manager.h b/src/kudu/fs/error_manager.h
index 8a32d5714..8c89195a0 100644
--- a/src/kudu/fs/error_manager.h
+++ b/src/kudu/fs/error_manager.h
@@ -26,17 +26,18 @@
 #include "kudu/fs/dir_manager.h"
 #include "kudu/fs/dir_util.h"
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/util/mutex.h"
 
 namespace kudu {
 namespace fs {
 
 // Callback to error-handling code. The input string is the UUID a failed
-// component.
+// component and the ID of the corresponding tenant.
 //
 // e.g. the ErrorNotificationCb for disk failure handling takes the UUID of a
 // directory, marks it failed, and shuts down the tablets in that directory.
-typedef std::function<void(const std::string&)> ErrorNotificationCb;
+typedef std::function<void(const std::string&, const std::string&)> 
ErrorNotificationCb;
 
 // Evaluates the expression and handles it if it results in an error.
 // Returns if the status is an error.
@@ -129,7 +130,7 @@ enum ErrorHandlerType {
 // e.g. the TSTabletManager registers a callback to handle disk failure.
 // Blocks and other entities that may hit disk failures can call it without
 // knowing about the TSTabletManager.
-class FsErrorManager {
+class FsErrorManager : public RefCountedThreadSafe<FsErrorManager> {
  public:
   FsErrorManager();
 
@@ -146,15 +147,28 @@ class FsErrorManager {
   // Runs the error notification callback.
   //
   // 'uuid' is the full UUID of the component that failed.
-  void RunErrorNotificationCb(ErrorHandlerType e, const std::string& uuid) 
const;
+  // 'tenant_id' is used to indicate the corresponding tenant, if not 
specified,
+  // we will treat it as the default tenant.
+  void RunErrorNotificationCb(
+      ErrorHandlerType e,
+      const std::string& uuid,
+      const std::string& tenant_id = fs::kDefaultTenantID) const;
 
   // Runs the error notification callback with the UUID of 'dir'.
-  void RunErrorNotificationCb(ErrorHandlerType e, const Dir* dir) const {
+  //
+  // 'tenant_id' is used to indicate the corresponding tenant, if not 
specified,
+  // we will treat it as the default tenant.
+  void RunErrorNotificationCb(ErrorHandlerType e,
+                              const Dir* dir,
+                              const std::string& tenant_id = 
fs::kDefaultTenantID) const {
     DCHECK_EQ(e, ErrorHandlerType::DISK_ERROR);
-    RunErrorNotificationCb(e, dir->instance()->uuid());
+    RunErrorNotificationCb(e, dir->instance()->uuid(), tenant_id);
   }
 
  private:
+  friend class RefCountedThreadSafe<FsErrorManager>;
+  ~FsErrorManager() {}
+
    // Callbacks to be run when an error occurs.
   std::unordered_map<ErrorHandlerType, ErrorNotificationCb, std::hash<int>> 
callbacks_;
 
diff --git a/src/kudu/fs/file_block_manager.cc 
b/src/kudu/fs/file_block_manager.cc
index def8b9f0a..e8524fd94 100644
--- a/src/kudu/fs/file_block_manager.cc
+++ b/src/kudu/fs/file_block_manager.cc
@@ -96,7 +96,7 @@ namespace internal {
 class FileBlockLocation {
  public:
   // Empty constructor
-  FileBlockLocation() {
+  FileBlockLocation() : data_dir_(nullptr) {
   }
 
   // Construct a location from its constituent parts.
@@ -260,6 +260,8 @@ class FileWritableBlock final : public WritableBlock {
 
   void HandleError(const Status& s) const;
 
+  string tenant_id() const { return block_manager_->tenant_id(); }
+
   // Starts an asynchronous flush of dirty block data to disk.
   Status FlushDataAsync();
 
@@ -315,7 +317,7 @@ FileWritableBlock::~FileWritableBlock() {
 void FileWritableBlock::HandleError(const Status& s) const {
   HANDLE_DISK_FAILURE(
       s, block_manager_->error_manager()->RunErrorNotificationCb(
-          ErrorHandlerType::DISK_ERROR, location_.data_dir()));
+          ErrorHandlerType::DISK_ERROR, location_.data_dir(), tenant_id()));
 }
 
 Status FileWritableBlock::Close() {
@@ -476,7 +478,7 @@ void FileReadableBlock::HandleError(const Status& s) const {
   const Dir* dir = block_manager_->dd_manager_->FindDirByUuidIndex(
       internal::FileBlockLocation::GetDirIdx(block_id_));
   HANDLE_DISK_FAILURE(s, 
block_manager_->error_manager()->RunErrorNotificationCb(
-      ErrorHandlerType::DISK_ERROR, dir));
+      ErrorHandlerType::DISK_ERROR, dir, 
block_manager_->dd_manager_->tenant_id()));
 }
 
 FileReadableBlock::FileReadableBlock(FileBlockManager* block_manager,
@@ -684,7 +686,8 @@ Status FileBlockManager::SyncMetadata(const 
internal::FileBlockLocation& locatio
       if (metrics_) metrics_->total_disk_sync->Increment();
       RETURN_NOT_OK_HANDLE_DISK_FAILURE(env_->SyncDir(s),
           error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
-                                                 location.data_dir()));
+                                                 location.data_dir(),
+                                                 tenant_id()));
     }
   }
   return Status::OK();
@@ -702,20 +705,22 @@ bool FileBlockManager::FindBlockPath(const BlockId& 
block_id,
 }
 
 FileBlockManager::FileBlockManager(Env* env,
-                                   DataDirManager* dd_manager,
-                                   FsErrorManager* error_manager,
+                                   scoped_refptr<DataDirManager> dd_manager,
+                                   scoped_refptr<FsErrorManager> error_manager,
                                    FileCache* file_cache,
-                                   BlockManagerOptions opts)
+                                   BlockManagerOptions opts,
+                                   string tenant_id)
   : env_(DCHECK_NOTNULL(env)),
-    dd_manager_(dd_manager),
-    error_manager_(DCHECK_NOTNULL(error_manager)),
+    dd_manager_(std::move(dd_manager)),
+    error_manager_(DCHECK_NOTNULL(std::move(error_manager))),
     opts_(std::move(opts)),
     file_cache_(file_cache),
     rand_(GetRandomSeed32()),
     next_block_id_(rand_.Next64()),
     mem_tracker_(MemTracker::CreateTracker(-1,
                                            "file_block_manager",
-                                           opts_.parent_mem_tracker)) {
+                                           opts_.parent_mem_tracker)),
+    tenant_id_(std::move(tenant_id)) {
   if (opts_.metric_entity) {
     metrics_.reset(new internal::BlockManagerMetrics(opts_.metric_entity));
   }
@@ -724,8 +729,9 @@ FileBlockManager::FileBlockManager(Env* env,
 FileBlockManager::~FileBlockManager() {
 }
 
-Status FileBlockManager::Open(FsReport* report, std::atomic<int>* 
containers_processed,
-                              std::atomic<int>* containers_total) {
+Status FileBlockManager::Open(FsReport* report, MergeReport need_merage,
+                              std::atomic<int>* /* containers_processed */,
+                              std::atomic<int>* /* containers_total */) {
   // Prepare the filesystem report and either return or log it.
   FsReport local_report;
   set<int> failed_dirs = dd_manager_->GetFailedDirs();
@@ -745,7 +751,11 @@ Status FileBlockManager::Open(FsReport* report, 
std::atomic<int>* containers_pro
     local_report.data_dirs.push_back(dd->dir());
   }
   if (report) {
-    *report = std::move(local_report);
+    if (need_merage == MergeReport::REQUIRED) {
+      report->MergeFrom(local_report);
+    } else {
+      *report = std::move(local_report);
+    }
   } else {
     RETURN_NOT_OK(local_report.LogAndCheckForFatalErrors());
   }
@@ -758,7 +768,9 @@ Status FileBlockManager::CreateBlock(const 
CreateBlockOptions& opts,
 
   Dir* dir;
   RETURN_NOT_OK_EVAL(dd_manager_->GetDirAddIfNecessary(opts, &dir),
-      
error_manager_->RunErrorNotificationCb(ErrorHandlerType::NO_AVAILABLE_DISKS, 
opts.tablet_id));
+      
error_manager_->RunErrorNotificationCb(ErrorHandlerType::NO_AVAILABLE_DISKS,
+                                             opts.tablet_id,
+                                             tenant_id()));
   int uuid_idx;
   CHECK(dd_manager_->FindUuidIndexByDir(dir, &uuid_idx));
 
@@ -794,7 +806,8 @@ Status FileBlockManager::CreateBlock(const 
CreateBlockOptions& opts,
     // no point in doing so. On disk failure, the tablet specified by 'opts'
     // will be shut down, so the returned block would not be used.
     RETURN_NOT_OK_HANDLE_DISK_FAILURE(s,
-        error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, 
dir));
+        error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
+                                               dir, tenant_id()));
     WritableFileOptions wr_opts;
     wr_opts.mode = Env::MUST_CREATE;
     wr_opts.is_sensitive = true;
@@ -815,7 +828,8 @@ Status FileBlockManager::CreateBlock(const 
CreateBlockOptions& opts,
     block->reset(new internal::FileWritableBlock(this, location, writer));
   } else {
     HANDLE_DISK_FAILURE(s,
-        error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, 
dir));
+        error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
+                                               dir, tenant_id()));
     return s;
   }
   return Status::OK();
@@ -824,8 +838,9 @@ Status FileBlockManager::CreateBlock(const 
CreateBlockOptions& opts,
 #define RETURN_NOT_OK_FBM_DISK_FAILURE(status_expr) do { \
   RETURN_NOT_OK_HANDLE_DISK_FAILURE((status_expr), \
       error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, \
-      dd_manager_->FindDirByUuidIndex( \
-      internal::FileBlockLocation::GetDirIdx(block_id)))); \
+          dd_manager_->FindDirByUuidIndex( \
+              internal::FileBlockLocation::GetDirIdx(block_id)), \
+          tenant_id())); \
 } while (0)
 
 Status FileBlockManager::OpenBlock(const BlockId& block_id,
diff --git a/src/kudu/fs/file_block_manager.h b/src/kudu/fs/file_block_manager.h
index 01ba46e77..d770e2386 100644
--- a/src/kudu/fs/file_block_manager.h
+++ b/src/kudu/fs/file_block_manager.h
@@ -26,7 +26,9 @@
 #include <vector>
 
 #include "kudu/fs/block_manager.h"
+#include "kudu/fs/error_manager.h"
 #include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/util/atomic.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/random.h"
@@ -41,7 +43,6 @@ class MemTracker;
 
 namespace fs {
 class DataDirManager;
-class FsErrorManager;
 struct FsReport;
 
 namespace internal {
@@ -73,14 +74,16 @@ class FileBlockManager : public BlockManager {
   // Note: all objects passed as pointers should remain alive for the lifetime
   // of the block manager.
   FileBlockManager(Env* env,
-                   DataDirManager* dd_manager,
-                   FsErrorManager* error_manager,
+                   scoped_refptr<DataDirManager> dd_manager,
+                   scoped_refptr<FsErrorManager> error_manager,
                    FileCache* file_cache,
-                   BlockManagerOptions opts);
+                   BlockManagerOptions opts,
+                   std::string tenant_id);
 
   ~FileBlockManager() override;
 
-  Status Open(FsReport* report, std::atomic<int>* containers_processed,
+  Status Open(FsReport* report, MergeReport need_merage,
+              std::atomic<int>* containers_processed,
               std::atomic<int>* containers_total) override;
 
   Status CreateBlock(const CreateBlockOptions& opts,
@@ -97,7 +100,9 @@ class FileBlockManager : public BlockManager {
 
   void NotifyBlockId(BlockId block_id) override;
 
-  FsErrorManager* error_manager() override { return error_manager_; }
+  scoped_refptr<FsErrorManager> error_manager() override { return 
error_manager_; }
+
+  std::string tenant_id() const override { return tenant_id_; }
 
  private:
   friend class internal::FileBlockDeletionTransaction;
@@ -128,10 +133,10 @@ class FileBlockManager : public BlockManager {
 
   // Manages and owns the data directories in which the block manager will
   // place its blocks.
-  DataDirManager* dd_manager_;
+  scoped_refptr<DataDirManager> dd_manager_;
 
   // Manages callbacks used to handle disk failure.
-  FsErrorManager* error_manager_;
+  scoped_refptr<FsErrorManager> error_manager_;
 
   // The options that the FileBlockManager was created with.
   const BlockManagerOptions opts_;
@@ -158,6 +163,9 @@ class FileBlockManager : public BlockManager {
   // interesting.
   std::shared_ptr<MemTracker> mem_tracker_;
 
+  // Which tenant this file block manager belongs to.
+  std::string tenant_id_;
+
   DISALLOW_COPY_AND_ASSIGN(FileBlockManager);
 };
 
diff --git a/src/kudu/fs/fs_manager-test.cc b/src/kudu/fs/fs_manager-test.cc
index 2c380bedb..63923bfa0 100644
--- a/src/kudu/fs/fs_manager-test.cc
+++ b/src/kudu/fs/fs_manager-test.cc
@@ -26,6 +26,7 @@
 #include <iostream>
 #include <iterator>
 #include <memory>
+#include <optional>
 #include <set>
 #include <string>
 #include <tuple>
@@ -41,11 +42,13 @@
 
 #include "kudu/fs/block_manager.h"
 #include "kudu/fs/data_dirs.h"
+#include "kudu/fs/default_key_provider.h"
 #include "kudu/fs/dir_manager.h"
 #include "kudu/fs/dir_util.h"
 #include "kudu/fs/fs.pb.h"
 #include "kudu/fs/fs_report.h"
 #include "kudu/gutil/map-util.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -64,6 +67,7 @@
 
 using kudu::pb_util::ReadPBContainerFromPath;
 using kudu::pb_util::SecureDebugString;
+using std::nullopt;
 using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
@@ -84,8 +88,27 @@ DECLARE_bool(enable_multi_tenancy);
 namespace kudu {
 namespace fs {
 
+static constexpr const char* const kTenantSelectors[] = {
+  "00000000000000000000000000000000", // "default_tenant_kudu"
+  "00000000000000000000000000000001", // "test_tenant_kudu"
+};
+
+static constexpr const char* const kEncryptionType[] = {
+  "kNonEncryption",         // FLAGS_encrypt_data_at_rest = false
+                            // FLAGS_enable_multi_tenancy = false
+  "kServerEncryption",      // FLAGS_encrypt_data_at_rest = true
+                            // FLAGS_enable_multi_tenancy = false
+  "kMultiTenantEncryption", // FLAGS_encrypt_data_at_rest = true
+                            // FLAGS_enable_multi_tenancy = true
+};
+
+static constexpr const char* const kTestTenantName = "test_tenant_kudu";
+static constexpr const char* const kTestTenantKey = 
"00010203040506070809101112131442";
+static constexpr const char* const kTestTenantKeyIv = 
"42141312111009080706050403020100";
+static constexpr const char* const kTestTenantKeyVersion = "kudutenantkey@0";
+
 class FsManagerTestBase : public KuduTest,
-                          public testing::WithParamInterface<string> {
+                          public 
testing::WithParamInterface<std::tuple<string, const char*>> {
  public:
   FsManagerTestBase()
      : fs_root_(GetTestPath("fs_root")) {
@@ -93,12 +116,58 @@ class FsManagerTestBase : public KuduTest,
 
   void SetUp() override {
     KuduTest::SetUp();
-    FLAGS_block_manager = GetParam();
+    FLAGS_block_manager = std::get<0>(GetParam());
+    auto encryption_type = std::get<1>(GetParam());
+    if (encryption_type == kEncryptionType[0]) {
+      tenant_id_ = kTenantSelectors[0];
+      FLAGS_encrypt_data_at_rest = false;
+      FLAGS_enable_multi_tenancy = false;
+    } else if (encryption_type == kEncryptionType[1]) {
+      tenant_id_ = kTenantSelectors[0];
+      FLAGS_encrypt_data_at_rest = true;
+      FLAGS_enable_multi_tenancy = false;
+    } else if (encryption_type == kEncryptionType[2]) {
+      tenant_id_ = kTenantSelectors[1];
+      FLAGS_encrypt_data_at_rest = true;
+      FLAGS_enable_multi_tenancy = true;
+    } else {
+      ASSERT_TRUE(0);
+    }
 
     // Initialize File-System Layout
     ReinitFsManager();
     ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
     ASSERT_OK(fs_manager_->Open());
+    if (tenant_id() != kTenantSelectors[0]) {
+      // Init tenant for non default tenant.
+      ASSERT_OK(fs_manager_->AddTenant(kTestTenantName,
+                                       tenant_id(),
+                                       kTestTenantKey,
+                                       kTestTenantKeyIv,
+                                       kTestTenantKeyVersion));
+    }
+  }
+
+  Env* GetEnv() const {
+    // TODO(kedeng):
+    //    Different tenants should use their own environments, but currently
+    // the data loading patches for multi-tenants have not been merged, which
+    // results in ignoring tenant information when re-opening the FS manager.
+    // This method is temporarily used to ensure that the single test can run
+    // successfully, and the implementation here will need to be modified in
+    // the future.
+    return fs_manager()->GetEnv(kTenantSelectors[0]);
+  }
+
+  void AddNonDefaultTanent() {
+    if (tenant_id() != kTenantSelectors[0]) {
+      // Init tenant for non default tenant.
+      ASSERT_OK(fs_manager_->AddTenant(kTestTenantName,
+                                       tenant_id(),
+                                       kTestTenantKey,
+                                       kTestTenantKeyIv,
+                                       kTestTenantKeyVersion));
+    }
   }
 
   void ReinitFsManager() {
@@ -122,36 +191,141 @@ class FsManagerTestBase : public KuduTest,
 
     // Test Write
     unique_ptr<WritableBlock> writer;
-    ASSERT_OK(fs_manager()->CreateNewBlock({}, &writer));
+    ASSERT_OK(fs_manager()->CreateNewBlock({}, &writer, tenant_id()));
     ASSERT_OK(writer->Append(data));
     ASSERT_OK(writer->Close());
 
     // Test Read
     Slice result(buffer, data.size());
     unique_ptr<ReadableBlock> reader;
-    ASSERT_OK(fs_manager()->OpenBlock(writer->id(), &reader));
+    ASSERT_OK(fs_manager()->OpenBlock(writer->id(), &reader, tenant_id()));
     ASSERT_OK(reader->Read(0, result));
     ASSERT_EQ(data, result);
   }
 
   FsManager *fs_manager() const { return fs_manager_.get(); }
 
+  const string& tenant_id() const { return tenant_id_; }
+
  protected:
   const string fs_root_;
 
  private:
   unique_ptr<FsManager> fs_manager_;
+  string tenant_id_;
 };
+
 INSTANTIATE_TEST_SUITE_P(BlockManagerTypes, FsManagerTestBase,
-                         
::testing::ValuesIn(BlockManager::block_manager_types()));
+    ::testing::Combine(
+        ::testing::ValuesIn(BlockManager::block_manager_types()),
+        ::testing::ValuesIn(kEncryptionType)));
 
 TEST_P(FsManagerTestBase, TestBaseOperations) {
-  fs_manager()->DumpFileSystemTree(std::cout);
+  fs_manager()->DumpFileSystemTree(std::cout, tenant_id());
 
   TestReadWriteDataFile(Slice("test0"));
   TestReadWriteDataFile(Slice("test1"));
 
-  fs_manager()->DumpFileSystemTree(std::cout);
+  fs_manager()->DumpFileSystemTree(std::cout, tenant_id());
+}
+
+TEST_P(FsManagerTestBase, TestTenantAccountOperation) {
+  int tenant_num = fs_manager()->tenants_count();
+  const string non_exist_tenant_name = "non_exist_tenant_name";
+  const string non_exist_tenant = "10000000000000000000000000000000";
+  const string default_tenant_id = "00000000000000000000000000000000";
+  auto encryption_type = std::get<1>(GetParam());
+  if (encryption_type != kEncryptionType[2]) {
+    if (encryption_type == kEncryptionType[0]) {
+      // Multi-tenancy is disabled.
+      ASSERT_FALSE(FLAGS_enable_multi_tenancy);
+      ASSERT_FALSE(FLAGS_encrypt_data_at_rest);
+    } else if (encryption_type == kEncryptionType[1]) {
+      // Multi-tenancy is disabled but data at rest encryption is enabled.
+      ASSERT_FALSE(FLAGS_enable_multi_tenancy);
+      ASSERT_TRUE(FLAGS_encrypt_data_at_rest);
+    }
+    ASSERT_EQ(0, tenant_num);
+    ASSERT_FALSE(fs_manager()->is_tenants_exist());
+
+    // Add tenant is not allowed.
+    ASSERT_DEATH(fs_manager()->AddTenant(non_exist_tenant_name, 
non_exist_tenant,
+                                         nullopt, nullopt, nullopt), "");
+    ASSERT_FALSE(fs_manager()->VertifyTenant(non_exist_tenant));
+    ASSERT_TRUE(fs_manager()->VertifyTenant(default_tenant_id));
+    ASSERT_EQ(0, fs_manager()->GetDataRootDirs(non_exist_tenant).size());
+    ASSERT_NE(0, fs_manager()->GetDataRootDirs(default_tenant_id).size());
+    ASSERT_FALSE(fs_manager()->block_manager(non_exist_tenant));
+    ASSERT_TRUE(fs_manager()->block_manager(default_tenant_id));
+
+    // Remove tenant is not allowed.
+    const auto s = fs_manager()->RemoveTenant(non_exist_tenant);
+    ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(),
+        Substitute("Not support for removing tenant for id: $0.", 
non_exist_tenant));
+  } else {
+    // Multi-tenancy is enabled.
+    ASSERT_TRUE(FLAGS_enable_multi_tenancy);
+    ASSERT_TRUE(FLAGS_encrypt_data_at_rest);
+    ASSERT_EQ(2, tenant_num);
+    ASSERT_TRUE(fs_manager()->is_tenants_exist());
+    for (const auto& tenant : kTenantSelectors) {
+      ASSERT_TRUE(fs_manager()->is_tenant_exist(tenant));
+      ASSERT_TRUE(fs_manager()->VertifyTenant(tenant));
+      // Re-add a tenant which was already exist will fail.
+      string new_tenant = "new_tenant_name";
+      const auto s = fs_manager()->AddTenant(new_tenant, tenant, nullopt,
+                                             nullopt, nullopt);
+      ASSERT_TRUE(s.IsAlreadyPresent()) << s.ToString();
+      ASSERT_STR_CONTAINS(s.ToString(),
+          Substitute("Tenant $0 already exists.", tenant));
+    }
+
+    ASSERT_FALSE(fs_manager()->is_tenant_exist(non_exist_tenant));
+
+    // Test add tenant.
+    string new_tenant_name = "new_tenant_name";
+    string new_tenant = "00000000000000000000000000000011";
+    // Make sure the new tenant does not exist.
+    ASSERT_FALSE(fs_manager()->is_tenant_exist(new_tenant));
+    // Generate key info to do tenant init.
+    security::DefaultKeyProvider key_provider;
+    string encrypted_key;
+    string iv;
+    string version;
+    ASSERT_OK(key_provider.GenerateEncryptionKey(&encrypted_key, &iv, 
&version));
+    ASSERT_OK(fs_manager()->AddTenant(new_tenant_name, new_tenant,
+                                      encrypted_key, iv, version));
+
+    // The key info we get need equal to what we set.
+    ASSERT_EQ(new_tenant_name, fs_manager()->tenant_name(new_tenant));
+    ASSERT_EQ(encrypted_key, fs_manager()->tenant_key(new_tenant));
+    ASSERT_EQ(iv, fs_manager()->tenant_key_iv(new_tenant));
+    ASSERT_EQ(version, fs_manager()->tenant_key_version(new_tenant));
+
+    // The new tenant need exist after 'AddTenant'.
+    ASSERT_TRUE(fs_manager()->is_tenant_exist(new_tenant));
+    ASSERT_EQ(3, fs_manager()->tenants_count());
+    for (auto& tenant : fs_manager()->GetAllTenants()) {
+      ASSERT_TRUE(fs_manager()->is_tenant_exist(tenant));
+      ASSERT_TRUE(fs_manager()->VertifyTenant(tenant));
+      ASSERT_NE(0, fs_manager()->GetDataRootDirs(tenant).size());
+    }
+
+    // Test remove tenant.
+    ASSERT_TRUE(fs_manager()->is_tenant_exist(new_tenant));
+    ASSERT_OK(fs_manager()->RemoveTenant(new_tenant));
+    ASSERT_FALSE(fs_manager()->is_tenant_exist(new_tenant));
+    ASSERT_EQ(2, fs_manager()->tenants_count());
+    for (auto& tenant : fs_manager()->GetAllTenants()) {
+      ASSERT_TRUE(fs_manager()->is_tenant_exist(tenant));
+    }
+
+    // Remove default tenant is not allowed.
+    const auto s = fs_manager()->RemoveTenant(default_tenant_id);
+    ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "Remove default tenant is not allowed.");
+  }
 }
 
 TEST_P(FsManagerTestBase, TestIllegalPaths) {
@@ -181,8 +355,10 @@ TEST_P(FsManagerTestBase, TestDuplicatePaths) {
   string path = GetTestPath("foo");
   ReinitFsManagerWithPaths(path, { path, path, path });
   ASSERT_OK(fs_manager()->CreateInitialFileSystemLayout());
+  ASSERT_OK(fs_manager()->Open());
+  NO_FATALS(AddNonDefaultTanent());
   ASSERT_EQ(vector<string>({ JoinPathSegments(path, 
fs_manager()->kDataDirName) }),
-            fs_manager()->GetDataRootDirs());
+            fs_manager()->GetDataRootDirs(tenant_id()));
 }
 
 TEST_P(FsManagerTestBase, TestListTablets) {
@@ -192,21 +368,21 @@ TEST_P(FsManagerTestBase, TestListTablets) {
 
   string path = fs_manager()->GetTabletMetadataDir();
   unique_ptr<WritableFile> writer;
-  ASSERT_OK(env_->NewWritableFile(
+  ASSERT_OK(GetEnv()->NewWritableFile(
       JoinPathSegments(path, "foo.kudutmp"), &writer));
-  ASSERT_OK(env_->NewWritableFile(
+  ASSERT_OK(GetEnv()->NewWritableFile(
       JoinPathSegments(path, "foo.kudutmp.abc123"), &writer));
-  ASSERT_OK(env_->NewWritableFile(
+  ASSERT_OK(GetEnv()->NewWritableFile(
       JoinPathSegments(path, "foo.bak"), &writer));
-  ASSERT_OK(env_->NewWritableFile(
+  ASSERT_OK(GetEnv()->NewWritableFile(
       JoinPathSegments(path, "foo.bak.abc123"), &writer));
-  ASSERT_OK(env_->NewWritableFile(
+  ASSERT_OK(GetEnv()->NewWritableFile(
       JoinPathSegments(path, ".hidden"), &writer));
   // An uncanonicalized id.
-  ASSERT_OK(env_->NewWritableFile(
+  ASSERT_OK(GetEnv()->NewWritableFile(
       JoinPathSegments(path, "6ba7b810-9dad-11d1-80b4-00c04fd430c8"), 
&writer));
   // 1 valid tablet id.
-  ASSERT_OK(env_->NewWritableFile(
+  ASSERT_OK(GetEnv()->NewWritableFile(
       JoinPathSegments(path, "922ff7ed14c14dbca4ee16331dfda42a"), &writer));
 
   ASSERT_OK(fs_manager()->ListTabletIds(&tablet_ids));
@@ -215,10 +391,10 @@ TEST_P(FsManagerTestBase, TestListTablets) {
 
 TEST_P(FsManagerTestBase, TestCannotUseNonEmptyFsRoot) {
   string path = GetTestPath("new_fs_root");
-  ASSERT_OK(env_->CreateDir(path));
+  ASSERT_OK(GetEnv()->CreateDir(path));
   {
     unique_ptr<WritableFile> writer;
-    ASSERT_OK(env_->NewWritableFile(
+    ASSERT_OK(GetEnv()->NewWritableFile(
         JoinPathSegments(path, "some_file"), &writer));
   }
 
@@ -236,7 +412,7 @@ TEST_P(FsManagerTestBase, TestEmptyWALPath) {
 
 TEST_P(FsManagerTestBase, TestOnlyWALPath) {
   string path = GetTestPath("new_fs_root");
-  ASSERT_OK(env_->CreateDir(path));
+  ASSERT_OK(GetEnv()->CreateDir(path));
 
   ReinitFsManagerWithPaths(path, {});
   ASSERT_OK(fs_manager()->CreateInitialFileSystemLayout());
@@ -332,8 +508,8 @@ TEST_P(FsManagerTestBase, TestMetadataDirInDataRoot) {
   Status s = fs_manager()->Open();
   ASSERT_STR_CONTAINS(s.ToString(), "could not verify required directory");
   ASSERT_TRUE(s.IsNotFound()) << s.ToString();
-  ASSERT_FALSE(env_->FileExists(opts.data_roots[0]));
-  ASSERT_TRUE(env_->FileExists(opts.data_roots[1]));
+  ASSERT_FALSE(GetEnv()->FileExists(opts.data_roots[0]));
+  ASSERT_TRUE(GetEnv()->FileExists(opts.data_roots[1]));
 
   // Now allow the reordering by specifying the expected metadata root.
   opts.metadata_root = opts.data_roots[1];
@@ -414,7 +590,7 @@ TEST_P(FsManagerTestBase, TestCreateWithFailedDirs) {
   // Create some top-level paths to place roots in.
   vector<string> data_paths = { GetTestPath("data1"), GetTestPath("data2"), 
GetTestPath("data3") };
   for (const string& path : data_paths) {
-    env_->CreateDir(path);
+    GetEnv()->CreateDir(path);
   }
   // Initialize the FS layout with roots in subdirectories of data_paths. When
   // we canonicalize paths, we canonicalize the dirname of each path (e.g.
@@ -440,18 +616,18 @@ TEST_P(FsManagerTestBase, 
TestOpenWithDuplicateInstanceFiles) {
   WritableFileOptions wr_opts;
   wr_opts.mode = Env::MUST_CREATE;
   const string duplicate_test_root = GetTestPath("fs_dup");
-  ASSERT_OK(env_->CreateDir(duplicate_test_root));
+  ASSERT_OK(GetEnv()->CreateDir(duplicate_test_root));
   const string duplicate_instance = JoinPathSegments(
       duplicate_test_root, FsManager::kInstanceMetadataFileName);
-  ASSERT_OK(env_util::CopyFile(env_, 
fs_manager()->GetInstanceMetadataPath(fs_root_),
+  ASSERT_OK(env_util::CopyFile(GetEnv(), 
fs_manager()->GetInstanceMetadataPath(fs_root_),
                                duplicate_instance, wr_opts));
 
   // Make a copy of the per-directory instance file.
   const string duplicate_test_dir = JoinPathSegments(duplicate_test_root, 
kDataDirName);
-  ASSERT_OK(env_->CreateDir(duplicate_test_dir));
+  ASSERT_OK(GetEnv()->CreateDir(duplicate_test_dir));
   const string duplicate_dir_instance = JoinPathSegments(
       duplicate_test_dir, kInstanceMetadataFileName);
-  ASSERT_OK(env_util::CopyFile(env_,
+  ASSERT_OK(env_util::CopyFile(GetEnv(),
         fs_manager()->dd_manager()->FindDirByUuidIndex(0)->instance()->path(),
         duplicate_dir_instance, wr_opts));
 
@@ -543,7 +719,7 @@ TEST_P(FsManagerTestBase, TestOpenWithUnhealthyDataDir) {
   // empty disk and attempt to use it. Upon opening the FS layout, we should
   // see no failed directories.
   FLAGS_env_inject_eio = 0;
-  ASSERT_OK(env_->DeleteRecursively(new_root));
+  ASSERT_OK(GetEnv()->DeleteRecursively(new_root));
   ReinitFsManagerWithOpts(opts);
   ASSERT_OK(fs_manager()->Open());
   ASSERT_EQ(0, fs_manager()->dd_manager()->GetFailedDirs().size());
@@ -571,7 +747,7 @@ TEST_P(FsManagerTestBase, TestOpenWithUnhealthyDataDir) {
   // The above behavior should be seen if the data directories are missing...
   FLAGS_env_inject_eio = 0;
   for (const auto& root : opts.data_roots) {
-    ASSERT_OK(env_->DeleteRecursively(root));
+    ASSERT_OK(GetEnv()->DeleteRecursively(root));
   }
   ReinitFsManagerWithOpts(opts);
   s = fs_manager()->Open();
@@ -591,8 +767,8 @@ TEST_P(FsManagerTestBase, 
TestOpenWithCanonicalizationFailure) {
   // Create some parent directories and subdirectories.
   const string dir1 = GetTestPath("test1");
   const string dir2 = GetTestPath("test2");
-  ASSERT_OK(env_->CreateDir(dir1));
-  ASSERT_OK(env_->CreateDir(dir2));
+  ASSERT_OK(GetEnv()->CreateDir(dir1));
+  ASSERT_OK(GetEnv()->CreateDir(dir2));
   const string subdir1 = GetTestPath("test1/subdir");
   const string subdir2 = GetTestPath("test2/subdir");
   FsManagerOpts opts;
@@ -611,7 +787,7 @@ TEST_P(FsManagerTestBase, 
TestOpenWithCanonicalizationFailure) {
 
   // Now fail the canonicalization by deleting a parent directory. This
   // simulates the mountpoint disappearing.
-  ASSERT_OK(env_->DeleteRecursively(dir2));
+  ASSERT_OK(GetEnv()->DeleteRecursively(dir2));
   ReinitFsManagerWithOpts(opts);
   Status s = fs_manager()->Open();
   ASSERT_OK(s);
@@ -622,7 +798,7 @@ TEST_P(FsManagerTestBase, 
TestOpenWithCanonicalizationFailure) {
   }
 
   // Let's try that again, but with the appropriate mountpoint/directory.
-  ASSERT_OK(env_->CreateDir(dir2));
+  ASSERT_OK(GetEnv()->CreateDir(dir2));
   ReinitFsManagerWithOpts(opts);
   ASSERT_OK(fs_manager()->Open());
   ASSERT_EQ(0, fs_manager()->dd_manager()->GetFailedDirs().size());
@@ -740,14 +916,14 @@ TEST_P(FsManagerTestBase, TestUmask) {
 TEST_P(FsManagerTestBase, TestOpenFailsWhenMissingImportantDir) {
   const string kWalRoot = fs_manager()->GetWalsRootDir();
 
-  ASSERT_OK(env_->DeleteDir(kWalRoot));
+  ASSERT_OK(GetEnv()->DeleteDir(kWalRoot));
   ReinitFsManager();
   Status s = fs_manager()->Open();
   ASSERT_TRUE(s.IsNotFound()) << s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(), "could not verify required directory");
 
   unique_ptr<WritableFile> f;
-  ASSERT_OK(env_->NewWritableFile(kWalRoot, &f));
+  ASSERT_OK(GetEnv()->NewWritableFile(kWalRoot, &f));
   s = fs_manager()->Open();
   ASSERT_TRUE(s.IsCorruption()) << s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(), "exists but is not a directory");
@@ -807,7 +983,7 @@ TEST_P(FsManagerTestBase, TestEIOWhileChangingDirs) {
   for (int i = 0; i < kMaxDirs; i++) {
     const string dir = Substitute("$0$1", kTestPathBase, i);
     all_dirs.emplace_back(dir);
-    ASSERT_OK(env_->CreateDir(dir));
+    ASSERT_OK(GetEnv()->CreateDir(dir));
   }
   FsManagerOpts opts;
   opts.wal_root = all_dirs[0];
@@ -840,7 +1016,7 @@ TEST_P(FsManagerTestBase, 
TestEIOWhileRunningUpdateDirsTool) {
   // Helper to get a new root.
   auto create_root = [&] (int i) {
     const string dir = Substitute("$0$1", kTestPathBase, i);
-    CHECK_OK(env_->CreateDir(dir));
+    CHECK_OK(GetEnv()->CreateDir(dir));
     return dir;
   };
 
@@ -860,7 +1036,7 @@ TEST_P(FsManagerTestBase, 
TestEIOWhileRunningUpdateDirsTool) {
       // Collect the contents of the InstanceMetadataPB objects.
       const auto instance_path = JoinPathSegments(root, 
FsManager::kInstanceMetadataFileName);
       unique_ptr<InstanceMetadataPB> pb(new InstanceMetadataPB);
-      Status s = ReadPBContainerFromPath(env_, instance_path, pb.get(), 
pb_util::NOT_SENSITIVE);
+      Status s = ReadPBContainerFromPath(GetEnv(), instance_path, pb.get(), 
pb_util::NOT_SENSITIVE);
       if (s.IsNotFound()) {
         InsertOrDie(&instances, instance_path, "");
       } else {
@@ -872,7 +1048,7 @@ TEST_P(FsManagerTestBase, 
TestEIOWhileRunningUpdateDirsTool) {
       unique_ptr<DirInstanceMetadataPB> bmi_pb(new DirInstanceMetadataPB);
       const auto block_manager_instance = JoinPathSegments(
           JoinPathSegments(root, kDataDirName), kInstanceMetadataFileName);
-      s = ReadPBContainerFromPath(env_, block_manager_instance,
+      s = ReadPBContainerFromPath(GetEnv(), block_manager_instance,
                                   bmi_pb.get(), pb_util::NOT_SENSITIVE);
       if (s.IsNotFound()) {
         InsertOrDie(&instances, block_manager_instance, "");
@@ -939,7 +1115,7 @@ TEST_P(FsManagerTestBase, TestReAddRemovedDataDir) {
     opts.data_roots = data_roots;
     ReinitFsManagerWithOpts(opts);
     ASSERT_OK(fs_manager()->Open());
-    DataDirManager* dd_manager = fs_manager()->dd_manager();
+    auto dd_manager = fs_manager()->dd_manager();
     ASSERT_EQ(data_roots.size(), dd_manager->GetDirs().size());
 
     // Since we haven't deleted any directories or instance files, ensure that
@@ -967,8 +1143,8 @@ TEST_P(FsManagerTestBase, 
TestCannotRemoveDataDirServingAsMetadataDir) {
 
   // Create a new fs layout with a metadata root explicitly set to the first
   // data root.
-  ASSERT_OK(env_->DeleteRecursively(fs_root_));
-  ASSERT_OK(env_->CreateDir(fs_root_));
+  ASSERT_OK(GetEnv()->DeleteRecursively(fs_root_));
+  ASSERT_OK(GetEnv()->CreateDir(fs_root_));
 
   FsManagerOpts opts;
   opts.data_roots = { JoinPathSegments(fs_root_, "data1"),
@@ -1127,14 +1303,14 @@ TEST_P(FsManagerTestBase, TestAddRemoveDataDirsFuzz) {
       // into the new fs root) then retry.
       string source_instance = fs_manager()->GetInstanceMetadataPath(fs_root_);
       bool is_dir;
-      Status s = env_->IsDirectory(fs_root, &is_dir);
+      Status s = GetEnv()->IsDirectory(fs_root, &is_dir);
       if (s.ok()) {
         ASSERT_TRUE(is_dir);
         string new_instance = fs_manager()->GetInstanceMetadataPath(fs_root);
-        if (!env_->FileExists(new_instance)) {
+        if (!GetEnv()->FileExists(new_instance)) {
           WritableFileOptions wr_opts;
           wr_opts.mode = Env::MUST_CREATE;
-          ASSERT_OK(env_util::CopyFile(env_, source_instance, new_instance, 
wr_opts));
+          ASSERT_OK(env_util::CopyFile(GetEnv(), source_instance, 
new_instance, wr_opts));
           ReinitFsManagerWithOpts(fs_opts);
           open_status = fs_manager()->Open();
         }
@@ -1163,10 +1339,10 @@ TEST_P(FsManagerTestBase, TestAddRemoveDataDirsFuzz) {
         string data_dir = JoinPathSegments(root, kDataDirName);
         string instance = JoinPathSegments(data_dir,
                                            kInstanceMetadataFileName);
-        ASSERT_TRUE(env_->FileExists(instance));
+        ASSERT_TRUE(GetEnv()->FileExists(instance));
         string copy = instance + kTmpInfix;
-        if (env_->FileExists(copy)) {
-          ASSERT_OK(env_->RenameFile(copy, instance));
+        if (GetEnv()->FileExists(copy)) {
+          ASSERT_OK(GetEnv()->RenameFile(copy, instance));
           repaired = true;
         }
       }
diff --git a/src/kudu/fs/fs_manager.cc b/src/kudu/fs/fs_manager.cc
index 4a43c74a2..3017209fd 100644
--- a/src/kudu/fs/fs_manager.cc
+++ b/src/kudu/fs/fs_manager.cc
@@ -165,6 +165,7 @@ DECLARE_bool(enable_multi_tenancy);
 DECLARE_bool(encrypt_data_at_rest);
 DECLARE_int32(encryption_key_length);
 
+using kudu::fs::BlockManager;
 using kudu::fs::BlockManagerOptions;
 using kudu::fs::CreateBlockOptions;
 using kudu::fs::DataDirManager;
@@ -243,7 +244,20 @@ FsManager::FsManager(Env* env, FsManagerOpts opts)
   }
 }
 
-FsManager::~FsManager() {}
+FsManager::~FsManager() {
+  {
+    std::lock_guard<LockType> lock(ddm_lock_);
+    dd_manager_map_.clear();
+  }
+  {
+    std::lock_guard<LockType> lock(bm_lock_);
+    block_manager_map_.clear();
+  }
+  {
+    std::lock_guard<LockType> lock(env_lock_);
+    env_map_.clear();
+  }
+}
 
 void FsManager::SetErrorNotificationCb(ErrorHandlerType e, ErrorNotificationCb 
cb) {
   error_manager_->SetErrorNotificationCb(e, std::move(cb));
@@ -375,20 +389,34 @@ Status FsManager::Init() {
   return Status::OK();
 }
 
-void FsManager::InitBlockManager() {
+scoped_refptr<BlockManager> FsManager::InitBlockManager(const string& 
tenant_id) {
+  auto block_manager = SearchBlockManager(tenant_id);
+  if (block_manager) {
+    return block_manager;
+  }
+
   BlockManagerOptions bm_opts;
   bm_opts.metric_entity = opts_.metric_entity;
   bm_opts.parent_mem_tracker = opts_.parent_mem_tracker;
   bm_opts.read_only = opts_.read_only;
   if (opts_.block_manager_type == "file") {
-    block_manager_.reset(new FileBlockManager(
-        GetEnv(), dd_manager_.get(), error_manager_.get(), opts_.file_cache, 
std::move(bm_opts)));
+    block_manager.reset(new FileBlockManager(
+        GetEnv(tenant_id), dd_manager(tenant_id), error_manager_,
+        opts_.file_cache, std::move(bm_opts), tenant_id));
   } else if (opts_.block_manager_type == "log") {
-    block_manager_.reset(new LogBlockManagerNativeMeta(
-        GetEnv(), dd_manager_.get(), error_manager_.get(), opts_.file_cache, 
std::move(bm_opts)));
+    block_manager.reset(new LogBlockManagerNativeMeta(
+        GetEnv(tenant_id), dd_manager(tenant_id), error_manager_,
+        opts_.file_cache, std::move(bm_opts), tenant_id));
   } else {
     LOG(FATAL) << "Unknown block_manager_type: " << opts_.block_manager_type;
   }
+
+  {
+    std::lock_guard<LockType> lock(bm_lock_);
+    block_manager_map_[tenant_id] = block_manager;
+  }
+
+  return block_manager;
 }
 
 Status FsManager::PartialOpen(CanonicalizedRootsList* missing_roots) {
@@ -520,9 +548,9 @@ Status FsManager::Open(FsReport* report, Timer* 
read_instance_metadata_files,
     //
     // The priority of tenant key is higher than that of server key.
     RETURN_NOT_OK(
-        
key_provider_->DecryptEncryptionKey(this->tenant_key(fs::kDefaultTenantName),
-                                            
this->tenant_key_iv(fs::kDefaultTenantName),
-                                            
this->tenant_key_version(fs::kDefaultTenantName),
+        
key_provider_->DecryptEncryptionKey(this->tenant_key(fs::kDefaultTenantID),
+                                            
this->tenant_key_iv(fs::kDefaultTenantID),
+                                            
this->tenant_key_version(fs::kDefaultTenantID),
                                             &decrypted_key));
   } else if (!server_key().empty() && key_provider_) {
     // Just check whether the upgrade operation is needed for 
'--enable_multi_tenancy'.
@@ -547,17 +575,7 @@ Status FsManager::Open(FsReport* report, Timer* 
read_instance_metadata_files,
   }
 
   // Open the directory manager if it has not been opened already.
-  if (!dd_manager_) {
-    DataDirManagerOptions dm_opts;
-    dm_opts.metric_entity = opts_.metric_entity;
-    dm_opts.read_only = opts_.read_only;
-    dm_opts.dir_type = opts_.block_manager_type;
-    dm_opts.update_instances = opts_.update_instances;
-    LOG_TIMING(INFO, "opening directory manager") {
-      RETURN_NOT_OK(DataDirManager::OpenExisting(GetEnv(),
-          canonicalized_data_fs_roots_, dm_opts, &dd_manager_));
-    }
-  }
+  RETURN_NOT_OK(OpenDataDirManager());
 
   // Only clean temporary files after the data dir manager successfully opened.
   // This ensures that we were able to obtain the exclusive directory locks
@@ -569,8 +587,8 @@ Status FsManager::Open(FsReport* report, Timer* 
read_instance_metadata_files,
 
   // Set an initial error handler to mark data directories as failed.
   error_manager_->SetErrorNotificationCb(
-      ErrorHandlerType::DISK_ERROR, [this](const string& uuid) {
-        this->dd_manager_->MarkDirFailedByUuid(uuid);
+      ErrorHandlerType::DISK_ERROR, [this](const string& uuid, const string& 
tenant_id) {
+        this->dd_manager(tenant_id)->MarkDirFailedByUuid(uuid);
       });
 
   // Finally, initialize and open the block manager if needed.
@@ -578,14 +596,11 @@ Status FsManager::Open(FsReport* report, Timer* 
read_instance_metadata_files,
     if (read_data_directories) {
       read_data_directories->Start();
     }
-    InitBlockManager();
-    LOG_TIMING(INFO, "opening block manager") {
-      if (opts_.block_manager_type == "file") {
-        RETURN_NOT_OK(block_manager_->Open(report, nullptr, nullptr));
-      } else {
-        RETURN_NOT_OK(block_manager_->Open(report, containers_processed, 
containers_total));
-      }
-    }
+    RETURN_NOT_OK(InitAndOpenBlockManager(report,
+                                          containers_processed,
+                                          containers_total,
+                                          fs::kDefaultTenantID,
+                                          
BlockManager::MergeReport::REQUIRED));
     if (read_data_directories) {
       read_data_directories->Stop();
       if (opts_.metric_entity && opts_.block_manager_type == "log") {
@@ -625,13 +640,89 @@ Status FsManager::Open(FsReport* report, Timer* 
read_instance_metadata_files,
   return Status::OK();
 }
 
+Status FsManager::AddDataDirManager(scoped_refptr<DataDirManager> dd_manager,
+                                    const string& tenant_id) {
+  std::lock_guard<LockType> lock(ddm_lock_);
+  scoped_refptr<DataDirManager> ddm(FindPtrOrNull(dd_manager_map_, tenant_id));
+  if (ddm) {
+    return Status::AlreadyPresent(Substitute("Tenant $0 already exists.", 
tenant_id));
+  }
+  dd_manager_map_[tenant_id] = std::move(dd_manager);
+  return Status::OK();
+}
+
+CanonicalizedRootsList FsManager::get_canonicalized_data_fs_roots(const 
string& tenant_id) const {
+  if (tenant_id == fs::kDefaultTenantID) {
+    return canonicalized_data_fs_roots_;
+  }
+
+  // TODO(kedeng):
+  //   Different tenants should own different data storage paths, and the new 
solution
+  // needs to be compatible with existing implementation details.
+  return canonicalized_data_fs_roots_;
+}
+
+Status FsManager::CreateNewDataDirManager(const string& tenant_id) {
+  CHECK(!dd_manager(tenant_id));
+
+  scoped_refptr<DataDirManager> ddm = nullptr;
+  DataDirManagerOptions dm_opts;
+  dm_opts.metric_entity = opts_.metric_entity;
+  dm_opts.read_only = opts_.read_only;
+  dm_opts.dir_type = opts_.block_manager_type;
+  dm_opts.tenant_id = tenant_id;
+  LOG_TIMING(INFO, "creating directory manager") {
+  RETURN_NOT_OK(DataDirManager::CreateNew(GetEnv(tenant_id),
+      get_canonicalized_data_fs_roots(tenant_id), dm_opts, &ddm));
+  }
+  RETURN_NOT_OK(AddDataDirManager(ddm, tenant_id));
+
+  return Status::OK();
+}
+
+Status FsManager::OpenDataDirManager(const string& tenant_id) {
+  if (!dd_manager(tenant_id)) {
+    scoped_refptr<DataDirManager> ddm = nullptr;
+    DataDirManagerOptions dm_opts;
+    dm_opts.metric_entity = opts_.metric_entity;
+    dm_opts.read_only = opts_.read_only;
+    dm_opts.dir_type = opts_.block_manager_type;
+    dm_opts.update_instances = opts_.update_instances;
+    dm_opts.tenant_id = tenant_id;
+    LOG_TIMING(INFO, "opening directory manager") {
+    RETURN_NOT_OK(DataDirManager::OpenExisting(GetEnv(tenant_id),
+        get_canonicalized_data_fs_roots(tenant_id), dm_opts, &ddm));
+    }
+    RETURN_NOT_OK(AddDataDirManager(ddm, tenant_id));
+  }
+  return Status::OK();
+}
+
+Status FsManager::InitAndOpenBlockManager(FsReport* report,
+                                          std::atomic<int>* 
containers_processed,
+                                          std::atomic<int>* containers_total,
+                                          const string& tenant_id,
+                                          BlockManager::MergeReport 
need_merage) {
+  auto block_manager = InitBlockManager(tenant_id);
+  DCHECK(block_manager);
+  LOG_TIMING(INFO, "opening block manager") {
+    if (opts_.block_manager_type == "file") {
+      RETURN_NOT_OK(block_manager->Open(report, need_merage, nullptr, 
nullptr));
+    } else {
+      RETURN_NOT_OK(block_manager->Open(report, need_merage,
+                                        containers_processed, 
containers_total));
+    }
+  }
+  return Status::OK();
+}
+
 void FsManager::CopyMetadata(
     unique_ptr<InstanceMetadataPB>* metadata) {
   shared_lock<rw_spinlock> md_lock(metadata_rwlock_.get_lock());
   (*metadata)->CopyFrom(*metadata_);
 }
 
-Status FsManager::UpdateMetadata(unique_ptr<InstanceMetadataPB> metadata) {
+Status FsManager::UpdateMetadata(unique_ptr<InstanceMetadataPB>& metadata) {
   // In the event of failure, rollback everything we changed.
   // <string, string>   <=>   <old instance file, backup instance file>
   unordered_map<string, string> changed_dirs;
@@ -691,6 +782,48 @@ void 
FsManager::UpdateMetadataFormatAndStampUnlock(InstanceMetadataPB* metadata)
   metadata->set_format_stamp(Substitute("Formatted at $0 on $1", time_str, 
hostname));
 }
 
+Status FsManager::AddTenantMetadata(const string& tenant_name,
+                                    const string& tenant_id,
+                                    const string& tenant_key,
+                                    const string& tenant_key_iv,
+                                    const string& tenant_key_version) {
+  unique_ptr<InstanceMetadataPB> metadata(new InstanceMetadataPB);
+  {
+    shared_lock<rw_spinlock> md_lock(metadata_rwlock_.get_lock());
+    metadata->CopyFrom(*metadata_);
+  }
+  InstanceMetadataPB::TenantMetadataPB* tenant_metadata = 
metadata->add_tenants();
+  tenant_metadata->set_tenant_name(tenant_name);
+  tenant_metadata->set_tenant_id(tenant_id);
+  tenant_metadata->set_tenant_key(tenant_key);
+  tenant_metadata->set_tenant_key_iv(tenant_key_iv);
+  tenant_metadata->set_tenant_key_version(tenant_key_version);
+  UpdateMetadataFormatAndStampUnlock(metadata.get());
+
+  return UpdateMetadata(metadata);
+}
+
+Status FsManager::RemoveTenantMetadata(const string& tenant_id) {
+  if (!is_tenant_exist(tenant_id)) {
+    return Status::NotFound(Substitute("$0: tenant not found", tenant_id));
+  }
+
+  unique_ptr<InstanceMetadataPB> metadata(new InstanceMetadataPB);
+  {
+    shared_lock<rw_spinlock> md_lock(metadata_rwlock_.get_lock());
+    metadata->CopyFrom(*metadata_);
+  }
+  for (int i = 0; i < metadata->tenants_size(); i++) {
+    if (metadata->tenants(i).tenant_id() == tenant_id) {
+      metadata->mutable_tenants()->DeleteSubrange(i, 1);
+      break;
+    }
+  }
+  UpdateMetadataFormatAndStampUnlock(metadata.get());
+
+  return UpdateMetadata(metadata);
+}
+
 Status FsManager::CreateInitialFileSystemLayout(optional<string> uuid,
                                                 optional<string> tenant_name,
                                                 optional<string> tenant_id,
@@ -720,6 +853,7 @@ Status 
FsManager::CreateInitialFileSystemLayout(optional<string> uuid,
   //
   // Files/directories created will NOT be synchronized to disk.
   InstanceMetadataPB metadata;
+  string tid = tenant_id ? *tenant_id : fs::kDefaultTenantID;
   RETURN_NOT_OK_PREPEND(CreateInstanceMetadata(std::move(uuid),
                                                std::move(tenant_name),
                                                std::move(tenant_id),
@@ -748,14 +882,7 @@ Status 
FsManager::CreateInitialFileSystemLayout(optional<string> uuid,
   // Create the directory manager.
   //
   // All files/directories created will be synchronized to disk.
-  DataDirManagerOptions dm_opts;
-  dm_opts.metric_entity = opts_.metric_entity;
-  dm_opts.read_only = opts_.read_only;
-  LOG_TIMING(INFO, "creating directory manager") {
-    RETURN_NOT_OK_PREPEND(DataDirManager::CreateNew(
-        GetEnv(), canonicalized_data_fs_roots_, dm_opts, &dd_manager_),
-                          "Unable to create directory manager");
-  }
+  RETURN_NOT_OK(CreateNewDataDirManager(tid));
 
   if (FLAGS_enable_data_block_fsync) {
     // Files/directories created by the directory manager in the fs roots have
@@ -934,7 +1061,15 @@ const string& FsManager::server_key_version() const {
   return CHECK_NOTNULL(metadata_.get())->server_key_version();
 }
 
-const int32_t FsManager::tenants_count() const {
+bool FsManager::VertifyTenant(const std::string& tenant_id) const {
+  if (tenant_id == fs::kDefaultTenantID) {
+    return true;
+  }
+
+  return FLAGS_enable_multi_tenancy && is_tenant_exist(tenant_id);
+}
+
+int32 FsManager::tenants_count() const {
   shared_lock<rw_spinlock> md_lock(metadata_rwlock_.get_lock());
   return metadata_->tenants_size();
 }
@@ -1007,16 +1142,93 @@ string FsManager::tenant_key_version(const string& 
tenant_id) const {
   return tenant ? tenant->tenant_key_version() : string("");
 }
 
-Env* FsManager::GetEnv(const std::string& tenant_id) {
+Status FsManager::AddTenant(const string& tenant_name,
+                            const string& tenant_id,
+                            optional<string> tenant_key,
+                            optional<string> tenant_key_iv,
+                            optional<string> tenant_key_version) {
+  CHECK(FLAGS_enable_multi_tenancy);
+  if (is_tenant_exist(tenant_id)) {
+    return Status::AlreadyPresent(Substitute("Tenant $0 already exists.", 
tenant_id));
+  }
+
+  if ((!tenant_key || !tenant_key_iv || !tenant_key_version) && key_provider_) 
{
+    // Generate tenant key info if missing something of tenant.
+    RETURN_NOT_OK_PREPEND(key_provider_->GenerateTenantKey(tenant_id,
+                                                           &(*tenant_key),
+                                                           &(*tenant_key_iv),
+                                                           
&(*tenant_key_version)),
+                          Substitute("Failed to generate encrypted tenant key 
for tenant: $0.",
+                                     tenant_id));
+  }
+
+  // Update the metadata.
+  RETURN_NOT_OK_PREPEND(AddTenantMetadata(tenant_name,
+                                          tenant_id,
+                                          *tenant_key,
+                                          *tenant_key_iv,
+                                          *tenant_key_version),
+                        Substitute("Fail to update metadata for add tenant: 
$0.", tenant_id));
+
+  // Make sure env is available for create dd manager.
+  if (!AddEnv(tenant_id)) {
+    return Status::Corruption(Substitute("Fail to add env for tenant with id: 
$0.", tenant_id));
+  }
+  RETURN_NOT_OK_PREPEND(SetEncryptionKey(tenant_id),
+                        Substitute("Unable to set encryption key for tenant: 
$0.", tenant_id));
+
+  // Create new dd manager and add the new dd manager to the dd manager map.
+  //
+  // TODO(kedeng):
+  //     The new tenant should have its own dd manager instead of sharing the 
default tenant's
+  // dd manager. This needs to be implemented along with having different 
storage paths for
+  // different tenants.
+  RETURN_NOT_OK_PREPEND(OpenDataDirManager(tenant_id),
+                        Substitute("Unable to create and open data dir manager 
for tenant: $0.",
+                                   tenant_id));
+
+  // Create new block manager and add the new block manager to the block 
manager map.
+  RETURN_NOT_OK_PREPEND(InitAndOpenBlockManager(nullptr,
+                                                nullptr,
+                                                nullptr,
+                                                tenant_id),
+                        Substitute("Unable to open block manager for tenant: 
$0.", tenant_id));
+
+  return Status::OK();
+}
+
+Status FsManager::RemoveTenant(const string& tenant_id) {
+  if (!VertifyTenant(tenant_id)) {
+    return Status::NotSupported(
+        Substitute("Not support for removing tenant for id: $0.", tenant_id));
+  }
+
+  if (tenant_id == fs::kDefaultTenantID) {
+    return Status::NotSupported("Remove default tenant is not allowed.");
+  }
+
+  return RemoveTenantMetadata(tenant_id);
+}
+
+vector<string> FsManager::GetAllTenants() const {
+  vector<string> tenant_ids;
+  shared_lock<rw_spinlock> md_lock(metadata_rwlock_.get_lock());
+  for (const auto& tdata : metadata_->tenants()) {
+    tenant_ids.push_back(tdata.tenant_id());
+  }
+
+  return tenant_ids;
+}
+
+Env* FsManager::AddEnv(const std::string& tenant_id) {
   if (tenant_id == fs::kDefaultTenantID) {
     return env_;
   }
 
   std::lock_guard<LockType> lock(env_lock_);
   auto env = FindPtrOrNull(env_map_, tenant_id);
-  if (!env && !FLAGS_enable_multi_tenancy) {
-    LOG(ERROR) << "'--enable_multi_tenancy' is disable for tenant: " << 
tenant_id;
-    return nullptr;
+  if (env) {
+    return env.get();
   }
 
   // Create new env and add the new env to the env map.
@@ -1026,9 +1238,36 @@ Env* FsManager::GetEnv(const std::string& tenant_id) {
   return new_env.get();
 }
 
-vector<string> FsManager::GetDataRootDirs() const {
+Env* FsManager::GetEnv(const std::string& tenant_id) const {
+  if (tenant_id == fs::kDefaultTenantID) {
+    return env_;
+  }
+
+  std::lock_guard<LockType> lock(env_lock_);
+  auto env = FindPtrOrNull(env_map_, tenant_id);
+  if (env) {
+    return env.get();
+  }
+
+  LOG(ERROR) << "'The --enable_multi_tenancy' is " << 
FLAGS_enable_multi_tenancy
+              << " for tenant: " << tenant_id << ", and we fail to search the 
env.";
+  return nullptr;
+}
+
+scoped_refptr<DataDirManager> FsManager::dd_manager(const string& tenant_id) 
const {
+  std::lock_guard<LockType> lock(ddm_lock_);
+  scoped_refptr<DataDirManager> dd_manager(FindPtrOrNull(dd_manager_map_, 
tenant_id));
+  return dd_manager;
+}
+
+vector<string> FsManager::GetDataRootDirs(const string& tenant_id) const {
+  if (!VertifyTenant(tenant_id)) {
+    LOG(ERROR) << "Unable to get data root dirs for non existing tenant: "
+               << tenant_id << ", exit.";
+    return {};
+  }
   // Get the data subdirectory for each data root.
-  return dd_manager_->GetDirs();
+  return dd_manager(tenant_id)->GetDirs();
 }
 
 string FsManager::GetTabletMetadataDir() const {
@@ -1123,13 +1362,30 @@ void FsManager::CheckAndFixPermissions() {
   }
 }
 
+scoped_refptr<BlockManager> FsManager::block_manager(const std::string& 
tenant_id) const {
+  if (!VertifyTenant(tenant_id)) {
+    LOG(ERROR) << "Do AddTenant for " << tenant_id
+               << " first before calling 'block_manager()'.";
+    return nullptr;
+  }
+
+  auto block_manager = SearchBlockManager(tenant_id);
+  return block_manager;
+}
+
 // ==========================================================================
 //  Dump/Debug utils
 // ==========================================================================
 
-void FsManager::DumpFileSystemTree(ostream& out) {
+void FsManager::DumpFileSystemTree(ostream& out, const string& tenant_id) {
   DCHECK(initted_);
 
+  if (!VertifyTenant(tenant_id)) {
+    LOG(ERROR) << "Unable to dump file system tree for non existing tenant: "
+               << tenant_id << ", exit.";
+    return;
+  }
+
   for (const auto& root : canonicalized_all_fs_roots_) {
     if (!root.status.ok()) {
       continue;
@@ -1137,7 +1393,7 @@ void FsManager::DumpFileSystemTree(ostream& out) {
     out << "File-System Root: " << root.path << std::endl;
 
     vector<string> objects;
-    Status s = GetEnv()->GetChildren(root.path, &objects);
+    Status s = GetEnv(tenant_id)->GetChildren(root.path, &objects);
     if (!s.ok()) {
       LOG(ERROR) << "Unable to list the fs-tree: " << s.ToString();
       return;
@@ -1164,25 +1420,66 @@ void FsManager::DumpFileSystemTree(ostream& out, const 
string& prefix,
   }
 }
 
+Status FsManager::SetEncryptionKeyUnlock(const string& tenant_id) {
+  // Set encryption key for tenant.
+  if (is_tenant_exist(tenant_id) && key_provider_) {
+    string tenant_key;
+    
RETURN_NOT_OK(key_provider_->DecryptEncryptionKey(tenant_key_unlock(tenant_id),
+                                                      
tenant_key_iv_unlock(tenant_id),
+                                                      
tenant_key_version_unlock(tenant_id),
+                                                      &tenant_key));
+    // 'tenant_key' is a hexadecimal string and SetEncryptionKey expects bits
+    // (hex / 2 = bytes * 8 = bits).
+    GetEnv(tenant_id)->SetEncryptionKey(reinterpret_cast<const uint8_t*>(
+                                          a2b_hex(tenant_key).c_str()),
+                                        tenant_key.length() * 4);
+  }
+  return Status::OK();
+}
+
+Status FsManager::SetEncryptionKey(const string& tenant_id) {
+  shared_lock<rw_spinlock> md_lock(metadata_rwlock_.get_lock());
+  return SetEncryptionKeyUnlock(tenant_id);
+}
+
 // ==========================================================================
 //  Data read/write interfaces
 // ==========================================================================
 
-Status FsManager::CreateNewBlock(const CreateBlockOptions& opts, 
unique_ptr<WritableBlock>* block) {
+Status FsManager::CreateNewBlock(const CreateBlockOptions& opts,
+                                 unique_ptr<WritableBlock>* block,
+                                 const string& tenant_id) {
+  if (!VertifyTenant(tenant_id)) {
+    return Status::NotFound(Substitute("$0: tenant not found", tenant_id));
+  }
+
+  auto bm = block_manager(tenant_id);
   CHECK(!opts_.read_only);
-  DCHECK(block_manager_);
-  return block_manager_->CreateBlock(opts, block);
+  DCHECK(bm);
+  return bm->CreateBlock(opts, block);
 }
 
-Status FsManager::OpenBlock(const BlockId& block_id, 
unique_ptr<ReadableBlock>* block) {
-  DCHECK(block_manager_);
-  return block_manager_->OpenBlock(block_id, block);
+Status FsManager::OpenBlock(const BlockId& block_id,
+                            unique_ptr<ReadableBlock>* block,
+                            const string& tenant_id) {
+  if (!VertifyTenant(tenant_id)) {
+    return Status::NotFound(Substitute("$0: tenant not found", tenant_id));
+  }
+
+  auto bm = block_manager(tenant_id);
+  DCHECK(bm);
+  return bm->OpenBlock(block_id, block);
 }
 
-bool FsManager::BlockExists(const BlockId& block_id) const {
-  DCHECK(block_manager_);
+bool FsManager::BlockExists(const BlockId& block_id, const string& tenant_id) {
+  if (!VertifyTenant(tenant_id)) {
+    return false;
+  }
+
+  auto bm = block_manager(tenant_id);
+  DCHECK(bm);
   unique_ptr<ReadableBlock> block;
-  return block_manager_->OpenBlock(block_id, &block).ok();
+  return bm->OpenBlock(block_id, &block).ok();
 }
 
 } // namespace kudu
diff --git a/src/kudu/fs/fs_manager.h b/src/kudu/fs/fs_manager.h
index 0761823a9..2b0135d03 100644
--- a/src/kudu/fs/fs_manager.h
+++ b/src/kudu/fs/fs_manager.h
@@ -22,6 +22,7 @@
 #include <iosfwd>
 #include <map>
 #include <memory>
+#include <mutex>
 #include <optional>
 #include <string>
 #include <vector>
@@ -30,12 +31,16 @@
 #include <glog/logging.h>
 #include <gtest/gtest_prod.h>
 
+#include "kudu/fs/block_manager.h"
+#include "kudu/fs/data_dirs.h"
 #include "kudu/fs/dir_manager.h"
 #include "kudu/fs/error_manager.h"
+#include "kudu/gutil/integral_types.h"
 #include "kudu/gutil/macros.h"
+#include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
-#include "kudu/util/locks.h"          // for percpu_rwlock
 #include "kudu/util/env.h"
+#include "kudu/util/locks.h"          // for percpu_rwlock
 #include "kudu/util/metrics.h"
 #include "kudu/util/oid_generator.h"
 #include "kudu/util/path_util.h"
@@ -60,17 +65,13 @@ class Timer;
 
 namespace fs {
 
-class BlockManager;
-class DataDirManager;
 class FsManagerTestBase_TestDuplicatePaths_Test;
 class FsManagerTestBase_TestEIOWhileRunningUpdateDirsTool_Test;
 class FsManagerTestBase_TestIsolatedMetadataDir_Test;
 class FsManagerTestBase_TestMetadataDirInDataRoot_Test;
 class FsManagerTestBase_TestMetadataDirInWALRoot_Test;
 class FsManagerTestBase_TestOpenWithDuplicateInstanceFiles_Test;
-class ReadableBlock;
-class WritableBlock;
-struct CreateBlockOptions;
+class FsManagerTestBase_TestTenantAccountOperation_Test;
 struct FsReport;
 
 } // namespace fs
@@ -250,20 +251,30 @@ class FsManager {
   // ==========================================================================
 
   // Creates a new block based on the options specified in 'opts'.
+  // If the tenant id is not specified, we treat it as the default tenant.
   //
   // Block will be synced on close.
   Status CreateNewBlock(const fs::CreateBlockOptions& opts,
-                        std::unique_ptr<fs::WritableBlock>* block);
+                        std::unique_ptr<fs::WritableBlock>* block,
+                        const std::string& tenant_id = fs::kDefaultTenantID);
 
+  // If the tenant id is not specified, we treat it as the default tenant.
   Status OpenBlock(const BlockId& block_id,
-                   std::unique_ptr<fs::ReadableBlock>* block);
+                   std::unique_ptr<fs::ReadableBlock>* block,
+                   const std::string& tenant_id = fs::kDefaultTenantID);
 
-  bool BlockExists(const BlockId& block_id) const;
+  // If the tenant id is not specified, we treat it as the default tenant.
+  bool BlockExists(const BlockId& block_id,
+                   const std::string& tenant_id = fs::kDefaultTenantID);
 
   // ==========================================================================
   //  on-disk path
   // ==========================================================================
-  std::vector<std::string> GetDataRootDirs() const;
+  // Get the data subdirectories for each data root, which belong to the tenant
+  // specified by the tenant id.
+  // If the tenant id is not specified, we treat it as the default tenant.
+  std::vector<std::string> GetDataRootDirs(
+      const std::string& tenant_id = fs::kDefaultTenantID) const;
 
   std::string GetWalsRootDir() const {
     DCHECK(initted_);
@@ -304,10 +315,10 @@ class FsManager {
 
   // Get env to do read/write.
   // Different tenant owns different env.
-  // Create a new env for the tenant if search fail when 
'--enable_multi_tenancy' enabled.
+  // Return nullptr if search fail when '--enable_multi_tenancy' enabled.
   //
   // If the tenant id is not specified, we treat it as the default tenant.
-  Env* GetEnv(const std::string& tenant_id = fs::kDefaultTenantID);
+  Env* GetEnv(const std::string& tenant_id = fs::kDefaultTenantID) const;
 
   bool read_only() const {
     return opts_.read_only;
@@ -325,8 +336,29 @@ class FsManager {
   //  tenant helpers
   // ==========================================================================
 
+  // A new tenant must have a tenant name and tenant ID. If the tenant key 
information
+  // is missing and the key generation service is available, we can use the 
relevant
+  // key generation service to generate the key information for it.
+  //
+  // The validation of tenant name and ID needs to be conducted on the master 
side.
+  Status AddTenant(const std::string& tenant_name,
+                   const std::string& tenant_id,
+                   std::optional<std::string> tenant_key,
+                   std::optional<std::string> tenant_key_iv,
+                   std::optional<std::string> tenant_key_version);
+
+  // As the tenant ID is globally unique and cannot be changed, while the 
tenant
+  // name can be changed, we use the tenant ID as the parameter to delete a 
tenant.
+  //
+  // TODO(kedeng):
+  //     All data owned by a tenant should be deleted when the tenant is 
removed.
+  Status RemoveTenant(const std::string& tenant_id);
+
+  // Get all the tenant id including the default tenant.
+  std::vector<std::string> GetAllTenants() const;
+
   // Use to get the total count of all the tenants.
-  const int32_t tenants_count() const;
+  int32 tenants_count() const;
 
   // Use to confirm whether there is tenants information in metadata.
   bool is_tenants_exist() const;
@@ -374,25 +406,42 @@ class FsManager {
   // ==========================================================================
   //  file-system helpers
   // ==========================================================================
-  bool Exists(const std::string& path) const {
-    return env_->FileExists(path);
+  // Used to judge whether a certain path exists.
+  //
+  // If the tenant id is not specified, we treat it as the default tenant.
+  bool Exists(const std::string& path,
+              const std::string& tenant_id = fs::kDefaultTenantID) const {
+    return GetEnv(tenant_id)->FileExists(path);
   }
 
-  Status ListDir(const std::string& path, std::vector<std::string> *objects) 
const {
-    return env_->GetChildren(path, objects);
+  // Get the dir list belongs to the tenant.
+  //
+  // If the tenant id is not specified, we treat it as the default tenant.
+  Status ListDir(const std::string& path,
+                 std::vector<std::string> *objects,
+                 const std::string& tenant_id = fs::kDefaultTenantID) const {
+    return GetEnv(tenant_id)->GetChildren(path, objects);
   }
 
-  fs::DataDirManager* dd_manager() const {
-    return dd_manager_.get();
-  }
+  // Search the tenant's dd manager.
+  //
+  // If the tenant id is not specified, we treat it as the default tenant.
+  scoped_refptr<fs::DataDirManager> dd_manager(
+      const std::string& tenant_id = fs::kDefaultTenantID) const;
 
-  fs::BlockManager* block_manager() {
-    DCHECK(block_manager_);
-    return block_manager_.get();
-  }
+  // Get block manager by tenant id.
+  // If the tenant does not exist, add it to the metadata, create a new block
+  // manager and new dd manager for corresponding.
+  //
+  // If the tenant id is not specified, we treat it as the default tenant.
+  scoped_refptr<fs::BlockManager> block_manager(
+      const std::string& tenant_id = fs::kDefaultTenantID) const;
 
   // Prints the file system trees under the file system roots.
-  void DumpFileSystemTree(std::ostream& out);
+  //
+  // If the tenant id is not specified, we treat it as the default tenant.
+  void DumpFileSystemTree(std::ostream& out,
+                          const std::string& tenant_id = fs::kDefaultTenantID);
 
   bool meta_on_xfs() const {
     return meta_on_xfs_;
@@ -405,6 +454,7 @@ class FsManager {
   FRIEND_TEST(fs::FsManagerTestBase, TestMetadataDirInWALRoot);
   FRIEND_TEST(fs::FsManagerTestBase, TestMetadataDirInDataRoot);
   FRIEND_TEST(fs::FsManagerTestBase, TestOpenWithDuplicateInstanceFiles);
+  FRIEND_TEST(fs::FsManagerTestBase, TestTenantAccountOperation);
   FRIEND_TEST(tserver::MiniTabletServerTest, TestFsLayoutEndToEnd);
   friend class itest::MiniClusterFsInspector; // for access to directory names
   friend Status tools::UpdateEncryptionKeyInfo(Env* env); // for update the 
metadata
@@ -414,9 +464,19 @@ class FsManager {
   Status Init();
 
   // Select and create an instance of the appropriate block manager.
+  // Search for the block manager corresponding to the tenant first, if no one 
exist,
+  // create a new one and then return it.
   //
   // Does not actually perform any on-disk operations.
-  void InitBlockManager();
+  scoped_refptr<fs::BlockManager> InitBlockManager(
+      const std::string& tenant_id = fs::kDefaultTenantID);
+
+  // Search the tenant's block manager.
+  scoped_refptr<fs::BlockManager> SearchBlockManager(const std::string& 
tenant_id) const {
+    std::lock_guard<LockType> lock(bm_lock_);
+    scoped_refptr<fs::BlockManager> 
block_manager(FindPtrOrNull(block_manager_map_, tenant_id));
+    return block_manager;
+  }
 
   // Creates filesystem roots from 'canonicalized_roots', writing new on-disk
   // instances using 'metadata'.
@@ -444,9 +504,32 @@ class FsManager {
   Status WriteInstanceMetadata(const InstanceMetadataPB& metadata,
                                const std::string& root);
 
+  // To support multi-tenant scenarios and non-encrypted scenarios, some 
interfaces
+  // have added a tenant ID parameter.
+  // This interface is used to confirm the validity of the tenant ID. The 
default
+  // tenant ID is valid in any scenario, while a non-default tenant ID is only 
valid
+  // when multi-tenant features are enabled.
+  //
+  // If valid, it returns true, otherwise it returns false.
+  bool VertifyTenant(const std::string& tenant_id) const;
+
+  // We record tenant information in metadata and persist it to disk. When we 
add a
+  // new tenant, the records on the disk need to be updated at first, and then
+  // update the metadata in memory if success.
+  //
+  // Called only in the encryption enable scenario when a new tenant appears.
+  Status AddTenantMetadata(const std::string& tenant_name,
+                           const std::string& tenant_id,
+                           const std::string& tenant_key,
+                           const std::string& tenant_key_iv,
+                           const std::string& tenant_key_version);
+
+  // Remove the tenant recorded in the memory and the disk.
+  Status RemoveTenantMetadata(const std::string& tenant_id);
+
   // Update metadata after adding tenant or removing tenant.
   Status UpdateMetadata(
-    std::unique_ptr<InstanceMetadataPB> metadata);
+    std::unique_ptr<InstanceMetadataPB>& metadata);
 
   // Search tenant for metadata by tenant id.
   const InstanceMetadataPB_TenantMetadataPB* GetTenant(const std::string& 
tenant_id) const;
@@ -479,6 +562,24 @@ class FsManager {
   //  file-system helpers
   // ==========================================================================
 
+  // Create a new env for the tenant if search fail when 
'--enable_multi_tenancy' enabled.
+  Env* AddEnv(const std::string& tenant_id);
+
+  // Set encryption key for the env of tenant.
+  Status SetEncryptionKey(const std::string& tenant_id);
+  // Except that the caller must hold md_lock_.
+  Status SetEncryptionKeyUnlock(const std::string& tenant_id);
+
+  // Create data dir manager for tenant and add it to dd manager map.
+  //
+  // If the tenant id is not specified, we treat it as the default tenant.
+  Status CreateNewDataDirManager(const std::string& tenant_id = 
fs::kDefaultTenantID);
+
+  // Open data dir manager for tenant and add it to dd manager map.
+  //
+  // If the tenant id is not specified, we treat it as the default tenant.
+  Status OpenDataDirManager(const std::string& tenant_id = 
fs::kDefaultTenantID);
+
   // Prints the file system tree for the objects in 'objects' under the given
   // 'path'. Prints lines with the given 'prefix'.
   void DumpFileSystemTree(std::ostream& out,
@@ -486,6 +587,23 @@ class FsManager {
                           const std::string& path,
                           const std::vector<std::string>& objects);
 
+  // Init and open block manager for tenant and add it to block manager map.
+  //
+  // If the tenant id is not specified, we treat it as the default tenant.
+  Status InitAndOpenBlockManager(fs::FsReport* report = nullptr,
+                                 std::atomic<int>* containers_processed = 
nullptr,
+                                 std::atomic<int>* containers_total = nullptr,
+                                 const std::string& tenant_id = 
fs::kDefaultTenantID,
+                                 fs::BlockManager::MergeReport need_merage =
+                                     
fs::BlockManager::MergeReport::NOT_REQUIRED);
+
+  // Add data dir manager to the 'dd_manager_map_' keyed by tenant_id.
+  // Return 'AlreadyPresent' if the tenant present.
+  //
+  // If the tenant id is not specified, we treat it as the default tenant.
+  Status AddDataDirManager(scoped_refptr<fs::DataDirManager> dd_manager,
+                           const std::string& tenant_id = 
fs::kDefaultTenantID);
+
   // Deletes leftover temporary files in all "special" top-level directories
   // (e.g. WAL root directory).
   //
@@ -499,6 +617,9 @@ class FsManager {
   // Returns true if 'fname' is a valid tablet ID.
   bool IsValidTabletId(const std::string& fname);
 
+  // Different tenants should own different data storage paths.
+  CanonicalizedRootsList get_canonicalized_data_fs_roots(const std::string& 
tenant_id) const;
+
   static const char *kDataDirName;
   static const char *kTabletMetadataDirName;
   static const char *kWalDirName;
@@ -531,14 +652,24 @@ class FsManager {
   CanonicalizedRootsList canonicalized_data_fs_roots_;
   CanonicalizedRootsList canonicalized_all_fs_roots_;
 
-  // Lock protecting the 'metadata_' below.
+  // Lock protecting the 'metadata_'.
   mutable percpu_rwlock metadata_rwlock_;
   // Belongs to the default tenant.
   std::unique_ptr<InstanceMetadataPB> metadata_;
 
-  std::unique_ptr<fs::FsErrorManager> error_manager_;
-  std::unique_ptr<fs::DataDirManager> dd_manager_;
-  std::unique_ptr<fs::BlockManager> block_manager_;
+  // Shared by all the block managers.
+  scoped_refptr<fs::FsErrorManager> error_manager_;
+  // Lock protecting 'dd_manager_map_' below.
+  mutable LockType ddm_lock_;
+  typedef scoped_refptr<fs::DataDirManager> ScopedDDManagerPtr;
+  typedef std::map<std::string, ScopedDDManagerPtr> DataDirManagerMap;
+  DataDirManagerMap dd_manager_map_;
+
+  // Lock protecting 'block_manager_map_'.
+  mutable LockType bm_lock_;
+  typedef scoped_refptr<fs::BlockManager> ScopedBlockManagerPtr;
+  typedef std::map<std::string, ScopedBlockManagerPtr> BlockManagerMap;
+  BlockManagerMap block_manager_map_;
 
   std::unique_ptr<security::KeyProvider> key_provider_;
 
diff --git a/src/kudu/fs/log_block_manager-test.cc 
b/src/kudu/fs/log_block_manager-test.cc
index 11d910f46..665d12d1c 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -133,23 +133,26 @@ class LogBlockManagerTest : public KuduTest, public 
::testing::WithParamInterfac
       // Use a small file cache (smaller than the number of containers).
       //
       // Not strictly necessary except for 
TestDeleteFromContainerAfterMetadataCompaction.
-      file_cache_("test_cache", env_, 50, scoped_refptr<MetricEntity>()),
-      bm_(CreateBlockManager(scoped_refptr<MetricEntity>())) {
+      file_cache_("test_cache", env_, 50, scoped_refptr<MetricEntity>()) {
     CHECK_OK(file_cache_.Init());
+    error_manager_ = make_scoped_refptr(new FsErrorManager());
+    bm_ = CreateBlockManager(scoped_refptr<MetricEntity>());
   }
 
   void SetUp() override {
     // Pass in a report to prevent the block manager from logging 
unnecessarily.
     FsReport report;
-    ASSERT_OK(bm_->Open(&report, nullptr, nullptr));
+    ASSERT_OK(bm_->Open(&report, BlockManager::MergeReport::NOT_REQUIRED, 
nullptr, nullptr));
     ASSERT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
     ASSERT_OK(dd_manager_->GetDataDirGroupPB(test_tablet_name_, 
&test_group_pb_));
   }
 
  protected:
-  LogBlockManager* CreateBlockManager(const scoped_refptr<MetricEntity>& 
metric_entity,
-                                      vector<string> test_data_dirs = {}) {
+  scoped_refptr<LogBlockManager> CreateBlockManager(
+      const scoped_refptr<MetricEntity>& metric_entity,
+      vector<string> test_data_dirs = {}) {
     PrepareDataDirs(&test_data_dirs);
+
     if (!dd_manager_) {
       // Ensure the directory manager is initialized.
       CHECK_OK(DataDirManager::CreateNewForTests(env_, test_data_dirs,
@@ -159,8 +162,8 @@ class LogBlockManagerTest : public KuduTest, public 
::testing::WithParamInterfac
     BlockManagerOptions opts;
     opts.metric_entity = metric_entity;
     CHECK_EQ(FLAGS_block_manager, "log");
-    return new LogBlockManagerNativeMeta(
-        env_, dd_manager_.get(), &error_manager_, &file_cache_, 
std::move(opts));
+    return make_scoped_refptr(new LogBlockManagerNativeMeta(env_, 
dd_manager_.get(), error_manager_,
+                              &file_cache_, std::move(opts), 
fs::kDefaultTenantID));
   }
 
   Status ReopenBlockManager(const scoped_refptr<MetricEntity>& metric_entity = 
nullptr,
@@ -186,8 +189,8 @@ class LogBlockManagerTest : public KuduTest, public 
::testing::WithParamInterfac
       RETURN_NOT_OK(dd_manager_->LoadDataDirGroupFromPB(test_tablet_name_, 
test_group_pb_));
     }
 
-    bm_.reset(CreateBlockManager(metric_entity, test_data_dirs));
-    RETURN_NOT_OK(bm_->Open(report, nullptr, nullptr));
+    bm_ = CreateBlockManager(metric_entity, test_data_dirs);
+    RETURN_NOT_OK(bm_->Open(report, BlockManager::MergeReport::NOT_REQUIRED, 
nullptr, nullptr));
     return Status::OK();
   }
 
@@ -247,10 +250,10 @@ class LogBlockManagerTest : public KuduTest, public 
::testing::WithParamInterfac
   string test_tablet_name_;
   CreateBlockOptions test_block_opts_;
 
-  unique_ptr<DataDirManager> dd_manager_;
-  FsErrorManager error_manager_;
+  scoped_refptr<DataDirManager> dd_manager_;
+  scoped_refptr<FsErrorManager> error_manager_;
   FileCache file_cache_;
-  unique_ptr<LogBlockManager> bm_;
+  scoped_refptr<LogBlockManager> bm_;
 
  private:
   enum GetMode {
@@ -1908,11 +1911,12 @@ TEST_P(LogBlockManagerTest, 
TestOpenWithFailedDirectories) {
       DataDirManagerOptions(), &dd_manager_));
 
   // Wire in a callback to fail data directories.
-  error_manager_.SetErrorNotificationCb(
-      ErrorHandlerType::DISK_ERROR, [this](const string& uuid) {
+    error_manager_->SetErrorNotificationCb(
+      ErrorHandlerType::DISK_ERROR, [this](const string& uuid,
+                                           const string& /* tenant_id */) {
         this->dd_manager_->MarkDirFailedByUuid(uuid);
       });
-  bm_.reset(CreateBlockManager(nullptr));
+  bm_ = CreateBlockManager(nullptr);
 
   // Fail one of the directories, chosen randomly.
   FLAGS_crash_on_eio = false;
@@ -1922,7 +1926,7 @@ TEST_P(LogBlockManagerTest, 
TestOpenWithFailedDirectories) {
 
   // Check the report, ensuring the correct directory has failed.
   FsReport report;
-  ASSERT_OK(bm_->Open(&report, nullptr, nullptr));
+  ASSERT_OK(bm_->Open(&report, BlockManager::MergeReport::NOT_REQUIRED, 
nullptr, nullptr));
   ASSERT_EQ(kNumDirs - 1, report.data_dirs.size());
   for (const string& data_dir : report.data_dirs) {
     ASSERT_NE(data_dir, test_dirs[failed_idx]);
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index cc3ec7427..87acadc30 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -578,6 +578,8 @@ class LogBlockContainer: public 
RefCountedThreadSafe<LogBlockContainer> {
     return read_only_status_;
   }
 
+  const std::string tenant_id() const { return block_manager_->tenant_id(); }
+
   // Simple accessors.
   LogBlockManager* block_manager() const { return block_manager_; }
   const string& id() const { return id_; }
@@ -883,7 +885,7 @@ class LogBlockContainerNativeMeta final : public 
LogBlockContainer {
 #define CONTAINER_DISK_FAILURE(status_expr, msg) do { \
   Status s_ = (status_expr); \
   HANDLE_DISK_FAILURE(s_, 
block_manager_->error_manager_->RunErrorNotificationCb( \
-      ErrorHandlerType::DISK_ERROR, data_dir_)); \
+      ErrorHandlerType::DISK_ERROR, data_dir_, tenant_id())); \
   WARN_NOT_OK(s_, msg); \
 } while (0)
 
@@ -961,7 +963,8 @@ LogBlockContainerNativeMeta::~LogBlockContainerNativeMeta() 
{
 void LogBlockContainer::HandleError(const Status& s) const {
   HANDLE_DISK_FAILURE(s,
       
block_manager()->error_manager()->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
-                                                               data_dir_));
+                                                               data_dir_,
+                                                               tenant_id()));
 }
 
 void LogBlockContainerNativeMeta::CompactMetadata() {
@@ -1029,7 +1032,8 @@ void LogBlockContainerNativeMeta::CompactMetadata() {
 
 #define RETURN_NOT_OK_CONTAINER_DISK_FAILURE(status_expr) do { \
   RETURN_NOT_OK_HANDLE_DISK_FAILURE((status_expr), \
-    
block_manager->error_manager()->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
 dir)); \
+      block_manager->error_manager()->RunErrorNotificationCb( \
+          ErrorHandlerType::DISK_ERROR, dir, block_manager->tenant_id())); \
 } while (0)
 
 Status LogBlockContainerNativeMeta::Create(LogBlockManager* block_manager,
@@ -1104,10 +1108,11 @@ Status 
LogBlockContainerNativeMeta::Create(LogBlockManager* block_manager,
   }
 
   // Prefer metadata status (arbitrarily).
-  FsErrorManager* em = block_manager->error_manager();
-  HANDLE_DISK_FAILURE(metadata_status,
-                      em->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, 
dir));
-  HANDLE_DISK_FAILURE(data_status, 
em->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, dir));
+  auto em = block_manager->error_manager();
+  HANDLE_DISK_FAILURE(metadata_status, em->RunErrorNotificationCb(
+      ErrorHandlerType::DISK_ERROR, dir, block_manager->tenant_id()));
+  HANDLE_DISK_FAILURE(data_status, em->RunErrorNotificationCb(
+      ErrorHandlerType::DISK_ERROR, dir, block_manager->tenant_id()));
   return !metadata_status.ok() ? metadata_status : data_status;
 }
 
@@ -2339,20 +2344,22 @@ const map<int64_t, int64_t> 
LogBlockManager::kPerFsBlockSizeBlockLimits({
   { 4096, 2721 }});
 
 LogBlockManager::LogBlockManager(Env* env,
-                                 DataDirManager* dd_manager,
-                                 FsErrorManager* error_manager,
+                                 scoped_refptr<DataDirManager> dd_manager,
+                                 scoped_refptr<FsErrorManager> error_manager,
                                  FileCache* file_cache,
-                                 BlockManagerOptions opts)
+                                 BlockManagerOptions opts,
+                                 string tenant_id)
   : env_(DCHECK_NOTNULL(env)),
-    dd_manager_(DCHECK_NOTNULL(dd_manager)),
-    error_manager_(DCHECK_NOTNULL(error_manager)),
+    dd_manager_(DCHECK_NOTNULL(std::move(dd_manager))),
+    error_manager_(DCHECK_NOTNULL(std::move(error_manager))),
     opts_(std::move(opts)),
     mem_tracker_(MemTracker::CreateTracker(-1,
                                            "log_block_manager",
                                            opts_.parent_mem_tracker)),
     file_cache_(file_cache),
     buggy_el6_kernel_(IsBuggyEl6Kernel(env->GetKernelRelease())),
-    next_block_id_(1) {
+    next_block_id_(1),
+    tenant_id_(std::move(tenant_id)) {
   managed_block_shards_.resize(kBlockMapChunk);
   for (auto& mb : managed_block_shards_) {
     mb.lock = unique_ptr<simple_spinlock>(new simple_spinlock());
@@ -2432,7 +2439,8 @@ Status LogBlockManagerNativeMeta::OpenContainer(Dir* dir,
   return LogBlockContainerNativeMeta::Open(this, dir, report, id, container);
 }
 
-Status LogBlockManager::Open(FsReport* report, std::atomic<int>* 
containers_processed,
+Status LogBlockManager::Open(FsReport* report, MergeReport need_merage,
+                             std::atomic<int>* containers_processed,
                              std::atomic<int>* containers_total) {
   // Establish (and log) block limits for each data directory using kernel,
   // filesystem, and gflags information.
@@ -2576,7 +2584,11 @@ Status LogBlockManager::Open(FsReport* report, 
std::atomic<int>* containers_proc
 
   // Either return or log the report.
   if (report) {
-    *report = std::move(merged_report);
+    if (need_merage == MergeReport::REQUIRED) {
+      report->MergeFrom(merged_report);
+    } else {
+      *report = std::move(merged_report);
+    }
   } else {
     RETURN_NOT_OK(merged_report.LogAndCheckForFatalErrors());
   }
@@ -2696,7 +2708,9 @@ Status LogBlockManager::GetOrCreateContainer(const 
CreateBlockOptions& opts,
                                              LogBlockContainerRefPtr* 
container) {
   Dir* dir;
   RETURN_NOT_OK_EVAL(dd_manager_->GetDirAddIfNecessary(opts, &dir),
-      
error_manager_->RunErrorNotificationCb(ErrorHandlerType::NO_AVAILABLE_DISKS, 
opts.tablet_id));
+      
error_manager_->RunErrorNotificationCb(ErrorHandlerType::NO_AVAILABLE_DISKS,
+                                             opts.tablet_id,
+                                             tenant_id()));
 
   {
     std::lock_guard<simple_spinlock> l(lock_);
@@ -2715,7 +2729,8 @@ Status LogBlockManager::GetOrCreateContainer(const 
CreateBlockOptions& opts,
   // We could create a container in a different directory, but there's
   // currently no point in doing so. On disk failure, the tablet specified by
   // 'opts' will be shut down, so the returned container would not be used.
-  HANDLE_DISK_FAILURE(s, 
error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, dir));
+  HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(
+      ErrorHandlerType::DISK_ERROR, dir, tenant_id()));
   RETURN_NOT_OK_PREPEND(s, "Could not create new log block container at " + 
dir->dir());
   {
     std::lock_guard<simple_spinlock> l(lock_);
@@ -2927,7 +2942,9 @@ Status LogBlockManager::RemoveLogBlock(const BlockId& 
block_id,
 
   LogBlockContainer* container = it->second->container();
   HANDLE_DISK_FAILURE(container->read_only_status(),
-      error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, 
container->data_dir()));
+      error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
+                                             container->data_dir(),
+                                             tenant_id()));
 
   // Return early if deleting a block in a failed directory.
   set<int> failed_dirs = dd_manager_->GetFailedDirs();
@@ -2958,7 +2975,7 @@ void LogBlockManager::OpenDataDir(
   Status s = env_->GetChildren(dir->dir(), &children);
   if (!s.ok()) {
     HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(
-        ErrorHandlerType::DISK_ERROR, dir));
+        ErrorHandlerType::DISK_ERROR, dir, tenant_id()));
     *result_status = s.CloneAndPrepend(Substitute(
         "Could not list children of $0", dir->dir()));
     return;
@@ -3114,7 +3131,7 @@ void LogBlockManager::LoadContainer(Dir* dir,
     s = env_->GetFileSizeOnDisk(data_filename, &reported_size);
     if (!s.ok()) {
       HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(
-          ErrorHandlerType::DISK_ERROR, dir));
+          ErrorHandlerType::DISK_ERROR, dir, tenant_id()));
       result->status = s.CloneAndPrepend(Substitute(
           "Could not get on-disk file size of container $0", 
container->ToString()));
       return;
@@ -3183,13 +3200,13 @@ void LogBlockManager::RepairTask(Dir* dir, 
internal::LogBlockContainerLoadResult
   Status s_ = (status_expr); \
   s_ = s_.CloneAndPrepend(msg); \
   RETURN_NOT_OK_HANDLE_DISK_FAILURE(s_, \
-      error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, 
dir)); \
+      error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, 
dir, tenant_id())); \
 } while (0)
 
 #define WARN_NOT_OK_LBM_DISK_FAILURE(status_expr, msg) do { \
   Status s_ = (status_expr); \
   HANDLE_DISK_FAILURE(s_, error_manager_->RunErrorNotificationCb( \
-      ErrorHandlerType::DISK_ERROR, dir)); \
+      ErrorHandlerType::DISK_ERROR, dir, tenant_id())); \
   WARN_NOT_OK(s_, msg); \
 } while (0)
 
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index 830d1798f..0cc8d4d25 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -34,6 +34,8 @@
 
 #include "kudu/fs/block_id.h"
 #include "kudu/fs/block_manager.h"
+#include "kudu/fs/data_dirs.h"
+#include "kudu/fs/error_manager.h"
 #include "kudu/fs/fs.pb.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
@@ -49,9 +51,7 @@ class Env;
 class FileCache;
 
 namespace fs {
-class DataDirManager;
 class Dir;
-class FsErrorManager;
 struct FsReport;
 
 namespace internal {
@@ -181,7 +181,8 @@ class LogBlockManager : public BlockManager {
 
   ~LogBlockManager() override;
 
-  Status Open(FsReport* report, std::atomic<int>* containers_processed,
+  Status Open(FsReport* report, MergeReport need_merage,
+              std::atomic<int>* containers_processed,
               std::atomic<int>* containers_total) override;
 
   Status CreateBlock(const CreateBlockOptions& opts,
@@ -198,16 +199,19 @@ class LogBlockManager : public BlockManager {
 
   void NotifyBlockId(BlockId block_id) override;
 
-  FsErrorManager* error_manager() override { return error_manager_; }
+  scoped_refptr<FsErrorManager> error_manager() override { return 
error_manager_; }
+
+  std::string tenant_id() const override { return tenant_id_; }
 
  protected:
   // Note: all objects passed as pointers should remain alive for the lifetime
   // of the block manager.
   LogBlockManager(Env* env,
-                  DataDirManager* dd_manager,
-                  FsErrorManager* error_manager,
+                  scoped_refptr<DataDirManager> dd_manager,
+                  scoped_refptr<FsErrorManager> error_manager,
                   FileCache* file_cache,
-                  BlockManagerOptions opts);
+                  BlockManagerOptions opts,
+                  std::string tenant_id);
 
   FRIEND_TEST(LogBlockManagerTest, TestAbortBlock);
   FRIEND_TEST(LogBlockManagerTest, TestCloseFinalizedBlock);
@@ -441,10 +445,10 @@ class LogBlockManager : public BlockManager {
 
   // Manages and owns the data directories in which the block manager will
   // place its blocks.
-  DataDirManager* dd_manager_;
+  scoped_refptr<DataDirManager> dd_manager_;
 
   // Manages callbacks used to handle disk failure.
-  FsErrorManager* error_manager_;
+  scoped_refptr<FsErrorManager> error_manager_;
 
   // The options that the LogBlockManager was created with.
   const BlockManagerOptions opts_;
@@ -512,6 +516,9 @@ class LogBlockManager : public BlockManager {
   // May be null if instantiated without metrics.
   std::unique_ptr<internal::LogBlockManagerMetrics> metrics_;
 
+  // Which tenant this log block manager belongs to.
+  std::string tenant_id_;
+
   DISALLOW_COPY_AND_ASSIGN(LogBlockManager);
 };
 
@@ -529,11 +536,13 @@ class LogBlockManager : public BlockManager {
 class LogBlockManagerNativeMeta : public LogBlockManager {
  public:
   LogBlockManagerNativeMeta(Env* env,
-                            DataDirManager* dd_manager,
-                            FsErrorManager* error_manager,
+                            scoped_refptr<DataDirManager> dd_manager,
+                            scoped_refptr<FsErrorManager> error_manager,
                             FileCache* file_cache,
-                            BlockManagerOptions opts)
-      : LogBlockManager(env, dd_manager, error_manager, file_cache, 
std::move(opts)) {
+                            BlockManagerOptions opts,
+                            std::string tenant_id)
+      : LogBlockManager(env, std::move(dd_manager), std::move(error_manager),
+                        file_cache, std::move(opts), std::move(tenant_id)) {
   }
 
  private:
diff --git a/src/kudu/master/master.cc b/src/kudu/master/master.cc
index 07925e727..82edad2c0 100644
--- a/src/kudu/master/master.cc
+++ b/src/kudu/master/master.cc
@@ -198,15 +198,17 @@ GROUP_FLAG_VALIDATOR(multi_master_pp, 
&ValidateMultiMasterProxiedRpcFlags);
 
 constexpr const char* kReplaceMasterMessage =
     "this master may return incorrect results and should be replaced";
-void CrashMasterOnDiskError(const string& uuid) {
+void CrashMasterOnDiskError(const string& uuid, const string& /* tenant_id */) 
{
   LOG(FATAL) << Substitute("Disk error detected on data directory $0: $1",
                            uuid, kReplaceMasterMessage);
 }
-void CrashMasterOnCFileCorruption(const string& tablet_id) {
+void CrashMasterOnCFileCorruption(const string& tablet_id,
+                                  const string& /* tenant_id */) {
   LOG(FATAL) << Substitute("CFile corruption detected on system catalog $0: 
$1",
                            tablet_id, kReplaceMasterMessage);
 }
-void CrashMasterOnKudu2233Corruption(const string& tablet_id) {
+void CrashMasterOnKudu2233Corruption(const string& tablet_id,
+                                     const string& /* tenant_id */) {
   LOG(FATAL) << Substitute("KUDU-2233 corruption detected on system catalog 
$0: $1 ",
                            tablet_id, kReplaceMasterMessage);
 }
diff --git a/src/kudu/tablet/compaction-test.cc 
b/src/kudu/tablet/compaction-test.cc
index e23b3083d..b1856d83b 100644
--- a/src/kudu/tablet/compaction-test.cc
+++ b/src/kudu/tablet/compaction-test.cc
@@ -1393,7 +1393,7 @@ TEST_F(TestCompaction, TestEmptyFlushDoesntLeakBlocks) {
   // Fetch the metric for the number of on-disk blocks, so we can later verify
   // that we actually remove data.
   fs::LogBlockManager* lbm = down_cast<fs::LogBlockManager*>(
-      harness_->fs_manager()->block_manager());
+      harness_->fs_manager()->block_manager().get());
 
   vector<BlockId> before_block_ids;
   ASSERT_OK(lbm->GetAllBlockIds(&before_block_ids));
diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc
index b6a31e86c..07598976b 100644
--- a/src/kudu/tablet/compaction.cc
+++ b/src/kudu/tablet/compaction.cc
@@ -984,7 +984,7 @@ Mutation* MergeUndoHistories(Mutation* left, Mutation* 
right) {
 // If 'old_row' has previous versions, this transforms prior version in undos
 // and adds them to 'new_undo_head'.
 Status MergeDuplicatedRowHistory(const string& tablet_id,
-                                 const FsErrorManager* error_manager,
+                                 const scoped_refptr<FsErrorManager>& 
error_manager,
                                  CompactionInputRow* old_row,
                                  Mutation** new_undo_head,
                                  Arena* arena) {
@@ -1343,7 +1343,7 @@ Status ApplyMutationsAndGenerateUndos(const MvccSnapshot& 
snap,
 }
 
 Status FlushCompactionInput(const string& tablet_id,
-                            const FsErrorManager* error_manager,
+                            const scoped_refptr<FsErrorManager>& error_manager,
                             CompactionInput* input,
                             const MvccSnapshot& snap,
                             const HistoryGcOpts& history_gc_opts,
diff --git a/src/kudu/tablet/compaction.h b/src/kudu/tablet/compaction.h
index 967fc70b9..c8e5e4f46 100644
--- a/src/kudu/tablet/compaction.h
+++ b/src/kudu/tablet/compaction.h
@@ -28,6 +28,7 @@
 
 #include "kudu/common/rowblock.h"
 #include "kudu/common/timestamp.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/tablet/rowset.h"
 #include "kudu/util/status.h"
 
@@ -233,7 +234,7 @@ Status ApplyMutationsAndGenerateUndos(const MvccSnapshot& 
snap,
 // After return of this function, this CompactionInput object is "used up" and 
will
 // no longer be useful.
 Status FlushCompactionInput(const std::string& tablet_id,
-                            const fs::FsErrorManager* error_manager,
+                            const scoped_refptr<fs::FsErrorManager>& 
error_manager,
                             CompactionInput* input,
                             const MvccSnapshot& snap,
                             const HistoryGcOpts& history_gc_opts,
diff --git a/src/kudu/tablet/delta_compaction.cc 
b/src/kudu/tablet/delta_compaction.cc
index 66516d4d9..8f6d27f56 100644
--- a/src/kudu/tablet/delta_compaction.cc
+++ b/src/kudu/tablet/delta_compaction.cc
@@ -21,6 +21,7 @@
 #include <map>
 #include <ostream>
 #include <string>
+#include <type_traits>
 #include <utility>
 #include <vector>
 
@@ -38,6 +39,7 @@
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/casts.h"
 #include "kudu/gutil/map-util.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/cfile_set.h"
@@ -50,9 +52,12 @@
 #include "kudu/tablet/mutation.h"
 #include "kudu/tablet/mvcc.h"
 #include "kudu/tablet/rowset_metadata.h"
-#include "kudu/util/memory/arena.h"
 #include "kudu/util/trace.h"
 
+namespace kudu {
+class Arena;
+}
+
 using kudu::fs::BlockCreationTransaction;
 using kudu::fs::BlockManager;
 using kudu::fs::CreateBlockOptions;
@@ -231,7 +236,7 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const 
IOContext* io_context) {
     nrows += n;
   }
 
-  BlockManager* bm = fs_manager_->block_manager();
+  auto bm = fs_manager_->block_manager();
   unique_ptr<BlockCreationTransaction> transaction = 
bm->NewCreationTransaction();
   RETURN_NOT_OK(base_data_writer_->FinishAndReleaseBlocks(transaction.get()));
 
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index d0f985f87..38a7f0f79 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -41,6 +41,7 @@
 #include "kudu/fs/fs_manager.h"
 #include "kudu/fs/io_context.h"
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/tablet/cfile_set.h"
 #include "kudu/tablet/compaction.h"
 #include "kudu/tablet/delta_compaction.h"
@@ -242,7 +243,7 @@ Status DiskRowSetWriter::AppendBlock(const RowBlock &block, 
int live_row_count)
 
 Status DiskRowSetWriter::Finish() {
   TRACE_EVENT0("tablet", "DiskRowSetWriter::Finish");
-  BlockManager* bm = rowset_metadata_->fs_manager()->block_manager();
+  auto bm = rowset_metadata_->fs_manager()->block_manager();
   unique_ptr<BlockCreationTransaction> transaction = 
bm->NewCreationTransaction();
   RETURN_NOT_OK(FinishAndReleaseBlocks(transaction.get()));
   return transaction->CommitCreatedBlocks();
@@ -336,7 +337,7 @@ RollingDiskRowSetWriter::RollingDiskRowSetWriter(
       can_roll_(false),
       written_count_(0),
       written_size_(0) {
-  BlockManager* bm = tablet_metadata->fs_manager()->block_manager();
+  auto bm = tablet_metadata->fs_manager()->block_manager();
   block_transaction_ = bm->NewCreationTransaction();
   CHECK(schema.has_column_ids());
 }
diff --git a/src/kudu/tablet/tablet_metadata.cc 
b/src/kudu/tablet/tablet_metadata.cc
index 706415145..979958edb 100644
--- a/src/kudu/tablet/tablet_metadata.cc
+++ b/src/kudu/tablet/tablet_metadata.cc
@@ -569,7 +569,7 @@ void TabletMetadata::DeleteOrphanedBlocks(const 
BlockIdContainer& blocks) {
     return;
   }
 
-  BlockManager* bm = fs_manager()->block_manager();
+  auto bm = fs_manager()->block_manager();
   shared_ptr<BlockDeletionTransaction> transaction = 
bm->NewDeletionTransaction();
   for (const BlockId& b : blocks) {
     transaction->AddDeletedBlock(b);
diff --git a/src/kudu/tools/tool_action_fs.cc b/src/kudu/tools/tool_action_fs.cc
index bc500848e..848064ffc 100644
--- a/src/kudu/tools/tool_action_fs.cc
+++ b/src/kudu/tools/tool_action_fs.cc
@@ -208,7 +208,7 @@ Status UpdateEncryptionKeyInfo(Env* env) {
   metadata->clear_server_key_version();
 
   // Write the new metadata to disk.
-  RETURN_NOT_OK(fs_manager.UpdateMetadata(move(metadata)));
+  RETURN_NOT_OK(fs_manager.UpdateMetadata(metadata));
 
   return Status::OK();
 }
diff --git a/src/kudu/tserver/tablet_copy_client-test.cc 
b/src/kudu/tserver/tablet_copy_client-test.cc
index 186a5da78..e938a9ee4 100644
--- a/src/kudu/tserver/tablet_copy_client-test.cc
+++ b/src/kudu/tserver/tablet_copy_client-test.cc
@@ -223,8 +223,8 @@ Status TabletCopyClientTest::CompareFileContents(const 
string& path1, const stri
   RETURN_NOT_OK(env_util::OpenFileForRandom(opts, fs_manager_->GetEnv(), 
path2, &file2));
 
   uint64_t size1;
-  uint64_t size2;
   RETURN_NOT_OK(file1->Size(&size1));
+  uint64_t size2;
   RETURN_NOT_OK(file2->Size(&size2));
   size1 -= file1->GetEncryptionHeaderSize();
   size2 -= file2->GetEncryptionHeaderSize();
@@ -547,7 +547,7 @@ TEST_P(TabletCopyClientBasicTest, TestDownloadAllBlocks) {
 // stop the copy client and cause it to fail.
 TEST_P(TabletCopyClientBasicTest, TestFailedDiskStopsClient) {
   ASSERT_OK(StartCopy());
-  DataDirManager* dd_manager = fs_manager_->dd_manager();
+  scoped_refptr<DataDirManager> dd_manager = fs_manager_->dd_manager();
 
   // Repeatedly fetch files for the client.
   Status s;
diff --git a/src/kudu/tserver/tablet_copy_client.cc 
b/src/kudu/tserver/tablet_copy_client.cc
index 9ae5c841b..0edd765f4 100644
--- a/src/kudu/tserver/tablet_copy_client.cc
+++ b/src/kudu/tserver/tablet_copy_client.cc
@@ -214,7 +214,7 @@ TabletCopyClient::TabletCopyClient(
       start_time_micros_(0),
       rng_(GetRandomSeed32()),
       dst_tablet_copy_metrics_(dst_tablet_copy_metrics) {
-  BlockManager* bm = dst_fs_manager->block_manager();
+  auto bm = dst_fs_manager->block_manager();
   transaction_ = bm->NewCreationTransaction();
   if (dst_tablet_copy_metrics_) {
     dst_tablet_copy_metrics_->open_client_sessions->Increment();
diff --git a/src/kudu/tserver/tablet_copy_service-test.cc 
b/src/kudu/tserver/tablet_copy_service-test.cc
index f85bed979..0fa4fd89c 100644
--- a/src/kudu/tserver/tablet_copy_service-test.cc
+++ b/src/kudu/tserver/tablet_copy_service-test.cc
@@ -21,8 +21,8 @@
 #include <memory>
 #include <ostream>
 #include <string>
-#include <type_traits>
 #include <thread>
+#include <type_traits>
 #include <vector>
 
 #include <gflags/gflags_declare.h>
diff --git a/src/kudu/tserver/tablet_server-test.cc 
b/src/kudu/tserver/tablet_server-test.cc
index 5a626096f..54bffc3cc 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -724,7 +724,7 @@ class TabletServerDiskSpaceTest : public 
TabletServerTestBase,
 // and there are additional directories available, directories are added to the
 // group, and the new groups are persisted to disk.
 TEST_P(TabletServerDiskSpaceTest, TestFullGroupAddsDir) {
-  DataDirManager* dd_manager = 
mini_server_->server()->fs_manager()->dd_manager();
+  scoped_refptr<DataDirManager> dd_manager = 
mini_server_->server()->fs_manager()->dd_manager();
   vector<string> dir_group;
   ASSERT_OK(dd_manager->FindDataDirsByTabletId(kTabletId, &dir_group));
   ASSERT_EQ(kNumDirs - 1, dir_group.size());
diff --git a/src/kudu/tserver/tablet_server.cc 
b/src/kudu/tserver/tablet_server.cc
index 694a4710f..247d86069 100644
--- a/src/kudu/tserver/tablet_server.cc
+++ b/src/kudu/tserver/tablet_server.cc
@@ -134,15 +134,17 @@ Status TabletServer::Start() {
   CHECK_EQ(kInitialized, state_);
 
   fs_manager_->SetErrorNotificationCb(
-      ErrorHandlerType::DISK_ERROR, [this](const string& uuid) {
-        this->tablet_manager_->FailTabletsInDataDir(uuid);
+      ErrorHandlerType::DISK_ERROR, [this](const string& uuid, const string& 
tenant_id) {
+        this->tablet_manager_->FailTabletsInDataDir(uuid, tenant_id);
       });
   fs_manager_->SetErrorNotificationCb(
-      ErrorHandlerType::CFILE_CORRUPTION, [this](const string& uuid) {
+      ErrorHandlerType::CFILE_CORRUPTION, [this](const string& uuid,
+                                                 const string& /* tenant_id 
*/) {
         this->tablet_manager_->FailTabletAndScheduleShutdown(uuid);
       });
   fs_manager_->SetErrorNotificationCb(
-      ErrorHandlerType::KUDU_2233_CORRUPTION, [this](const string& uuid) {
+      ErrorHandlerType::KUDU_2233_CORRUPTION, [this](const string& uuid,
+                                                     const string& /* 
tenant_id */) {
         this->tablet_manager_->FailTabletAndScheduleShutdown(uuid);
       });
   unique_ptr<ServiceIf> ts_service(new TabletServiceImpl(this));
diff --git a/src/kudu/tserver/ts_tablet_manager.cc 
b/src/kudu/tserver/ts_tablet_manager.cc
index fa5c9c9bf..7ccc72efb 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -1918,8 +1918,9 @@ Status TSTabletManager::DeleteTabletData(
   return Status::OK();
 }
 
-void TSTabletManager::FailTabletsInDataDir(const string& uuid) {
-  DataDirManager* dd_manager = fs_manager_->dd_manager();
+void TSTabletManager::FailTabletsInDataDir(const string& uuid,
+                                           const string& tenant_id) {
+  scoped_refptr<DataDirManager> dd_manager = 
fs_manager_->dd_manager(tenant_id);
   int uuid_idx;
   CHECK(dd_manager->FindUuidIndexByUuid(uuid, &uuid_idx))
       << Substitute("No data directory found with UUID $0", uuid);
diff --git a/src/kudu/tserver/ts_tablet_manager.h 
b/src/kudu/tserver/ts_tablet_manager.h
index 865441f7f..21e14f089 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -242,7 +242,7 @@ class TSTabletManager : public 
tserver::TabletReplicaLookupIf {
   void FailTabletAndScheduleShutdown(const std::string& tablet_id);
 
   // Forces shutdown of the tablet replicas in the data dir corresponding to 
'uuid'.
-  void FailTabletsInDataDir(const std::string& uuid);
+  void FailTabletsInDataDir(const std::string& uuid, const std::string& 
tenant_id);
 
   // Refresh the cached counts of tablet states, if the cache is old enough,
   // and return the count for tablet state 'st'.

Reply via email to