This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new bee128371 [multi-tenancy] add add/remove tenant api in fs layer
bee128371 is described below
commit bee1283718fda9d9d28455e62501e52f78fbf565
Author: kedeng <[email protected]>
AuthorDate: Thu Jun 29 15:23:39 2023 +0800
[multi-tenancy] add add/remove tenant api in fs layer
In this patch, my main addition is the API for adding and
deleting tenants in the fs layer.
Additionally, I adjusted the env, dd_manager, block_manager,
and other relevant parameters to be tenant-related. For the
default tenant, which is used for both enabled and not enabled
multi-tenant features, the default parameters will be used.
New tenants enabled with multi-tenant feature will possess their
own unique environment parameters.
Also, I added unit tests to ensure the validity of the newly added
logic.
However, this patch leaves the standardization work for tenant
storage paths in multi-tenant scenarios to the next patch, where
I will focus on implementing this part.
Change-Id: I3f31d3ddd636952f8bd432330afbde018169a2a1
Reviewed-on: http://gerrit.cloudera.org:8080/20144
Tested-by: Kudu Jenkins
Reviewed-by: Yingchun Lai <[email protected]>
---
src/kudu/cfile/cfile-test.cc | 2 +-
src/kudu/fs/block_manager-stress-test.cc | 16 +-
src/kudu/fs/block_manager-test.cc | 22 +-
src/kudu/fs/block_manager.h | 22 +-
src/kudu/fs/data_dirs-test.cc | 2 +-
src/kudu/fs/data_dirs.cc | 18 +-
src/kudu/fs/data_dirs.h | 17 +-
src/kudu/fs/dir_manager.cc | 7 +-
src/kudu/fs/dir_manager.h | 10 +-
src/kudu/fs/error_manager-test.cc | 23 +-
src/kudu/fs/error_manager.cc | 9 +-
src/kudu/fs/error_manager.h | 26 +-
src/kudu/fs/file_block_manager.cc | 51 ++--
src/kudu/fs/file_block_manager.h | 24 +-
src/kudu/fs/fs_manager-test.cc | 268 ++++++++++++++---
src/kudu/fs/fs_manager.cc | 411 +++++++++++++++++++++++----
src/kudu/fs/fs_manager.h | 193 +++++++++++--
src/kudu/fs/log_block_manager-test.cc | 36 +--
src/kudu/fs/log_block_manager.cc | 61 ++--
src/kudu/fs/log_block_manager.h | 35 ++-
src/kudu/master/master.cc | 8 +-
src/kudu/tablet/compaction-test.cc | 2 +-
src/kudu/tablet/compaction.cc | 4 +-
src/kudu/tablet/compaction.h | 3 +-
src/kudu/tablet/delta_compaction.cc | 9 +-
src/kudu/tablet/diskrowset.cc | 5 +-
src/kudu/tablet/tablet_metadata.cc | 2 +-
src/kudu/tools/tool_action_fs.cc | 2 +-
src/kudu/tserver/tablet_copy_client-test.cc | 4 +-
src/kudu/tserver/tablet_copy_client.cc | 2 +-
src/kudu/tserver/tablet_copy_service-test.cc | 2 +-
src/kudu/tserver/tablet_server-test.cc | 2 +-
src/kudu/tserver/tablet_server.cc | 10 +-
src/kudu/tserver/ts_tablet_manager.cc | 5 +-
src/kudu/tserver/ts_tablet_manager.h | 2 +-
35 files changed, 1018 insertions(+), 297 deletions(-)
diff --git a/src/kudu/cfile/cfile-test.cc b/src/kudu/cfile/cfile-test.cc
index cfb19f367..761eed50e 100644
--- a/src/kudu/cfile/cfile-test.cc
+++ b/src/kudu/cfile/cfile-test.cc
@@ -975,7 +975,7 @@ TEST_P(TestCFileBothCacheMemoryTypes, TestReleaseBlock) {
WriterOptions opts;
CFileWriter w(opts, GetTypeInfo(STRING), false, std::move(sink));
ASSERT_OK(w.Start());
- BlockManager* bm = fs_manager_->block_manager();
+ auto bm = fs_manager_->block_manager();
unique_ptr<fs::BlockCreationTransaction> transaction =
bm->NewCreationTransaction();
ASSERT_OK(w.FinishAndReleaseBlock(transaction.get()));
ASSERT_OK(transaction->CommitCreatedBlocks());
diff --git a/src/kudu/fs/block_manager-stress-test.cc
b/src/kudu/fs/block_manager-stress-test.cc
index f8f042836..4dd49af5c 100644
--- a/src/kudu/fs/block_manager-stress-test.cc
+++ b/src/kudu/fs/block_manager-stress-test.cc
@@ -36,6 +36,7 @@
#include "kudu/fs/block_id.h"
#include "kudu/fs/block_manager.h"
#include "kudu/fs/data_dirs.h"
+#include "kudu/fs/dir_manager.h"
#include "kudu/fs/error_manager.h"
#include "kudu/fs/file_block_manager.h" // IWYU pragma: keep
#include "kudu/fs/fs.pb.h"
@@ -149,7 +150,7 @@ class BlockManagerStressTest : public KuduTest {
// Defer block manager creation until after the above flags are set.
bm_.reset(CreateBlockManager());
CHECK_OK(file_cache_.Init());
- CHECK_OK(bm_->Open(nullptr, nullptr, nullptr));
+ CHECK_OK(bm_->Open(nullptr, BlockManager::MergeReport::NOT_REQUIRED,
nullptr, nullptr));
CHECK_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
CHECK_OK(dd_manager_->GetDataDirGroupPB(test_tablet_name_,
&test_group_pb_));
}
@@ -186,8 +187,9 @@ class BlockManagerStressTest : public KuduTest {
CHECK_OK(DataDirManager::OpenExistingForTests(env_, data_dirs,
DataDirManagerOptions(), &dd_manager_));
}
- return new T(env_, dd_manager_.get(), &error_manager_,
- &file_cache_, BlockManagerOptions());
+ error_manager_ = new FsErrorManager();
+ return new T(env_, dd_manager_.get(), error_manager_,
+ &file_cache_, BlockManagerOptions(), fs::kDefaultTenantName);
}
void RunTest(double secs) {
@@ -254,13 +256,13 @@ class BlockManagerStressTest : public KuduTest {
// Protects written_blocks_.
simple_spinlock lock_;
- unique_ptr<DataDirManager> dd_manager_;
+ scoped_refptr<DataDirManager> dd_manager_;
- FsErrorManager error_manager_;
+ scoped_refptr<FsErrorManager> error_manager_;
FileCache file_cache_;
- unique_ptr<BlockManager> bm_;
+ scoped_refptr<BlockManager> bm_;
// Test group of disk to spread data across.
DataDirGroupPB test_group_pb_;
@@ -542,7 +544,7 @@ TYPED_TEST(BlockManagerStressTest, StressTest) {
LOG(INFO) << "Running on populated block manager (restart #" << i << ")";
this->bm_.reset(this->CreateBlockManager());
FsReport report;
- ASSERT_OK(this->bm_->Open(&report, nullptr, nullptr));
+ ASSERT_OK(this->bm_->Open(&report,
BlockManager::MergeReport::NOT_REQUIRED, nullptr, nullptr));
ASSERT_OK(this->dd_manager_->LoadDataDirGroupFromPB(this->test_tablet_name_,
this->test_group_pb_));
ASSERT_OK(report.LogAndCheckForFatalErrors());
diff --git a/src/kudu/fs/block_manager-test.cc
b/src/kudu/fs/block_manager-test.cc
index 90dbe43e6..1daac8cf2 100644
--- a/src/kudu/fs/block_manager-test.cc
+++ b/src/kudu/fs/block_manager-test.cc
@@ -37,6 +37,7 @@
#include "kudu/fs/block_id.h"
#include "kudu/fs/data_dirs.h"
+#include "kudu/fs/dir_manager.h"
#include "kudu/fs/error_manager.h"
#include "kudu/fs/file_block_manager.h"
#include "kudu/fs/fs.pb.h"
@@ -130,7 +131,7 @@ class BlockManagerTest : public KuduTest {
void SetUp() override {
// Pass in a report to prevent the block manager from logging
unnecessarily.
FsReport report;
- ASSERT_OK(bm_->Open(&report, nullptr, nullptr));
+ ASSERT_OK(bm_->Open(&report, BlockManager::MergeReport::NOT_REQUIRED,
nullptr, nullptr));
ASSERT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
ASSERT_OK(dd_manager_->GetDataDirGroupPB(test_tablet_name_,
&test_group_pb_));
}
@@ -168,8 +169,9 @@ class BlockManagerTest : public KuduTest {
BlockManagerOptions opts;
opts.metric_entity = metric_entity;
opts.parent_mem_tracker = parent_mem_tracker;
- return new T(env_, this->dd_manager_.get(), &error_manager_,
- &file_cache_, std::move(opts));
+ error_manager_ = new FsErrorManager();
+ return new T(env_, this->dd_manager_.get(), error_manager_,
+ &file_cache_, std::move(opts), fs::kDefaultTenantName);
}
Status ReopenBlockManager(const scoped_refptr<MetricEntity>& metric_entity,
@@ -191,7 +193,7 @@ class BlockManagerTest : public KuduTest {
env_, paths, opts, &dd_manager_));
}
bm_.reset(CreateBlockManager(metric_entity, parent_mem_tracker));
- RETURN_NOT_OK(bm_->Open(nullptr, nullptr, nullptr));
+ RETURN_NOT_OK(bm_->Open(nullptr, BlockManager::MergeReport::NOT_REQUIRED,
nullptr, nullptr));
// Certain tests may maintain their own directory groups, in which case
// the default test group should not be used.
@@ -238,10 +240,10 @@ class BlockManagerTest : public KuduTest {
DataDirGroupPB test_group_pb_;
string test_tablet_name_;
CreateBlockOptions test_block_opts_;
- FsErrorManager error_manager_;
- unique_ptr<DataDirManager> dd_manager_;
+ scoped_refptr<FsErrorManager> error_manager_;
+ scoped_refptr<DataDirManager> dd_manager_;
FileCache file_cache_;
- unique_ptr<T> bm_;
+ scoped_refptr<T> bm_;
};
template <>
@@ -249,7 +251,7 @@ void BlockManagerTest<LogBlockManagerNativeMeta>::SetUp() {
RETURN_NOT_LOG_BLOCK_MANAGER();
// Pass in a report to prevent the block manager from logging unnecessarily.
FsReport report;
- ASSERT_OK(bm_->Open(&report, nullptr, nullptr));
+ ASSERT_OK(bm_->Open(&report, BlockManager::MergeReport::NOT_REQUIRED,
nullptr, nullptr));
ASSERT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
// Store the DataDirGroupPB for tests that reopen the block manager.
@@ -748,10 +750,10 @@ TYPED_TEST(BlockManagerTest, PersistenceTest) {
// The existing block manager is left open, which proxies for the process
// having crashed without cleanly shutting down the block manager. The
// on-disk metadata should still be clean.
- unique_ptr<BlockManager> new_bm(this->CreateBlockManager(
+ scoped_refptr<BlockManager> new_bm(this->CreateBlockManager(
scoped_refptr<MetricEntity>(),
MemTracker::CreateTracker(-1, "other tracker")));
- ASSERT_OK(new_bm->Open(nullptr, nullptr, nullptr));
+ ASSERT_OK(new_bm->Open(nullptr, BlockManager::MergeReport::NOT_REQUIRED,
nullptr, nullptr));
// Test that the state of all three blocks is properly reflected.
unique_ptr<ReadableBlock> read_block;
diff --git a/src/kudu/fs/block_manager.h b/src/kudu/fs/block_manager.h
index aad77adb9..b33e47a65 100644
--- a/src/kudu/fs/block_manager.h
+++ b/src/kudu/fs/block_manager.h
@@ -195,7 +195,7 @@ struct BlockManagerOptions {
// Utilities for Kudu block lifecycle management. All methods are
// thread-safe.
-class BlockManager {
+class BlockManager : public RefCountedThreadSafe<BlockManager> {
public:
// Lists the available block manager types.
static std::vector<std::string> block_manager_types() {
@@ -208,6 +208,15 @@ class BlockManager {
virtual ~BlockManager() {}
+ enum class MergeReport {
+ NOT_REQUIRED, ///< Do no fs report merge operation when open block
manager.
+ ///< The report only records the data of this open
operation.
+
+ REQUIRED ///< Do fs report merge operation when open block manager
if report ptr
+ /// is not null.
+ ///< The report updates the data of this operation to the
incoming data.
+ };
+
// Opens an existing on-disk representation of this block manager and
// checks it for inconsistencies. If found, and if the block manager was not
// constructed in read-only mode, an attempt will be made to repair them.
@@ -223,7 +232,8 @@ class BlockManager {
// If 'containers_processed' and 'containers_total' are not nullptr, they
will
// be populated with total containers attempted to be opened/processed and
// total containers present respectively.
- virtual Status Open(FsReport* report, std::atomic<int>* containers_processed,
+ virtual Status Open(FsReport* report, MergeReport need_merage,
+ std::atomic<int>* containers_processed,
std::atomic<int>* containers_total) = 0;
// Creates a new block using the provided options and opens it for
@@ -273,7 +283,13 @@ class BlockManager {
virtual void NotifyBlockId(BlockId block_id) = 0;
// Exposes the FsErrorManager used to handle fs errors.
- virtual FsErrorManager* error_manager() = 0;
+ virtual scoped_refptr<FsErrorManager> error_manager() = 0;
+
+ // Exposes the tenant id.
+ virtual std::string tenant_id() const = 0;
+
+ private:
+ friend class RefCountedThreadSafe<BlockManager>;
};
// Group a set of block creations together in a transaction. This has two
diff --git a/src/kudu/fs/data_dirs-test.cc b/src/kudu/fs/data_dirs-test.cc
index 3584b71c1..9d333d217 100644
--- a/src/kudu/fs/data_dirs-test.cc
+++ b/src/kudu/fs/data_dirs-test.cc
@@ -110,7 +110,7 @@ class DataDirsTest : public KuduTest {
const CreateBlockOptions test_block_opts_;
MetricRegistry registry_;
scoped_refptr<MetricEntity> entity_;
- std::unique_ptr<DataDirManager> dd_manager_;
+ scoped_refptr<DataDirManager> dd_manager_;
};
TEST_F(DataDirsTest, TestCreateGroup) {
diff --git a/src/kudu/fs/data_dirs.cc b/src/kudu/fs/data_dirs.cc
index b5dc54c00..c8b4c8960 100644
--- a/src/kudu/fs/data_dirs.cc
+++ b/src/kudu/fs/data_dirs.cc
@@ -222,7 +222,7 @@ int DataDir::reserved_bytes() const {
////////////////////////////////////////////////////////////
DataDirManagerOptions::DataDirManagerOptions()
- : DirManagerOptions(FLAGS_block_manager) {}
+ : DirManagerOptions(FLAGS_block_manager, fs::kDefaultTenantID) {}
DataDirManager::DataDirManager(Env* env,
const DataDirManagerOptions& opts,
@@ -233,9 +233,9 @@ DataDirManager::DataDirManager(Env* env,
opts, std::move(canonicalized_data_roots)) {}
Status DataDirManager::OpenExistingForTests(Env* env,
- vector<string> data_fs_roots,
+ const vector<string>&
data_fs_roots,
const DataDirManagerOptions& opts,
- unique_ptr<DataDirManager>*
dd_manager) {
+
scoped_refptr<kudu::fs::DataDirManager>* dd_manager) {
CanonicalizedRootsList roots;
for (const auto& r : data_fs_roots) {
roots.push_back({ r, Status::OK() });
@@ -245,17 +245,17 @@ Status DataDirManager::OpenExistingForTests(Env* env,
Status DataDirManager::OpenExisting(Env* env, CanonicalizedRootsList
data_fs_roots,
const DataDirManagerOptions& opts,
- unique_ptr<DataDirManager>* dd_manager) {
- unique_ptr<DataDirManager> dm;
+ scoped_refptr<kudu::fs::DataDirManager>*
dd_manager) {
+ scoped_refptr<DataDirManager> dm;
dm.reset(new DataDirManager(env, opts, std::move(data_fs_roots)));
RETURN_NOT_OK(dm->Open());
dd_manager->swap(dm);
return Status::OK();
}
-Status DataDirManager::CreateNewForTests(Env* env, vector<string>
data_fs_roots,
+Status DataDirManager::CreateNewForTests(Env* env, const vector<string>&
data_fs_roots,
const DataDirManagerOptions& opts,
- unique_ptr<DataDirManager>*
dd_manager) {
+
scoped_refptr<kudu::fs::DataDirManager>* dd_manager) {
CanonicalizedRootsList roots;
for (const auto& r : data_fs_roots) {
roots.push_back({ r, Status::OK() });
@@ -265,8 +265,8 @@ Status DataDirManager::CreateNewForTests(Env* env,
vector<string> data_fs_roots,
Status DataDirManager::CreateNew(Env* env, CanonicalizedRootsList
data_fs_roots,
const DataDirManagerOptions& opts,
- unique_ptr<DataDirManager>* dd_manager) {
- unique_ptr<DataDirManager> dm;
+ scoped_refptr<kudu::fs::DataDirManager>*
dd_manager) {
+ scoped_refptr<DataDirManager> dm;
dm.reset(new DataDirManager(env, opts, std::move(data_fs_roots)));
RETURN_NOT_OK(dm->Create());
RETURN_NOT_OK(dm->Open());
diff --git a/src/kudu/fs/data_dirs.h b/src/kudu/fs/data_dirs.h
index c3083720d..d6f22d66b 100644
--- a/src/kudu/fs/data_dirs.h
+++ b/src/kudu/fs/data_dirs.h
@@ -106,7 +106,8 @@ struct DataDirManagerOptions : public DirManagerOptions {
// Encapsulates knowledge of data directory management on behalf of block
// managers.
-class DataDirManager : public DirManager {
+class DataDirManager : public DirManager,
+ public RefCountedThreadSafe<DataDirManager> {
public:
enum class DirDistributionMode {
ACROSS_ALL_DIRS,
@@ -116,20 +117,20 @@ class DataDirManager : public DirManager {
// Public static initializers for use in tests. When used, data_fs_roots is
// expected to be the successfully canonicalized directories.
static Status CreateNewForTests(Env* env,
- std::vector<std::string> data_fs_roots,
+ const std::vector<std::string>&
data_fs_roots,
const DataDirManagerOptions& opts,
- std::unique_ptr<DataDirManager>* dd_manager);
+ scoped_refptr<DataDirManager>* dd_manager);
static Status OpenExistingForTests(Env* env,
- std::vector<std::string> data_fs_roots,
+ const std::vector<std::string>&
data_fs_roots,
const DataDirManagerOptions& opts,
- std::unique_ptr<DataDirManager>*
dd_manager);
+ scoped_refptr<DataDirManager>*
dd_manager);
// Constructs a directory manager and creates its necessary files on-disk.
//
// Returns an error if any of the directories already exist.
static Status CreateNew(Env* env, CanonicalizedRootsList data_fs_roots,
const DataDirManagerOptions& opts,
- std::unique_ptr<DataDirManager>* dd_manager);
+ scoped_refptr<DataDirManager>* dd_manager);
// Constructs a directory manager and indexes the files found on-disk.
//
@@ -137,7 +138,7 @@ class DataDirManager : public DirManager {
// max allowed, or if locks need to be acquired and cannot be.
static Status OpenExisting(Env* env, CanonicalizedRootsList data_fs_roots,
const DataDirManagerOptions& opts,
- std::unique_ptr<DataDirManager>* dd_manager);
+ scoped_refptr<DataDirManager>* dd_manager);
// Deserializes a DataDirGroupPB and associates the resulting DataDirGroup
// with a tablet_id.
@@ -196,6 +197,8 @@ class DataDirManager : public DirManager {
FRIEND_TEST(DataDirsTest, TestLoadBalancingBias);
FRIEND_TEST(DataDirsTest, TestLoadBalancingDistribution);
FRIEND_TEST(DataDirsTest, TestFailedDirNotAddedToGroup);
+ friend class RefCountedThreadSafe<DataDirManager>;
+ ~DataDirManager() override {}
// Populates the maps to index the given directories.
Status PopulateDirectoryMaps(const std::vector<std::unique_ptr<Dir>>& dirs)
override;
diff --git a/src/kudu/fs/dir_manager.cc b/src/kudu/fs/dir_manager.cc
index 61f3e92f5..94d7c945b 100644
--- a/src/kudu/fs/dir_manager.cc
+++ b/src/kudu/fs/dir_manager.cc
@@ -159,8 +159,11 @@ Status Dir::RefreshAvailableSpace(RefreshMode mode) {
return Status::OK();
}
-DirManagerOptions::DirManagerOptions(const string& dir_type)
- : dir_type(dir_type), read_only(false),
+DirManagerOptions::DirManagerOptions(string dir_type,
+ string tid)
+ : dir_type(std::move(dir_type)),
+ tenant_id(std::move(tid)),
+ read_only(false),
update_instances(UpdateInstanceBehavior::UPDATE_AND_IGNORE_FAILURES) {}
diff --git a/src/kudu/fs/dir_manager.h b/src/kudu/fs/dir_manager.h
index b7a54fe40..4502740b5 100644
--- a/src/kudu/fs/dir_manager.h
+++ b/src/kudu/fs/dir_manager.h
@@ -187,6 +187,11 @@ struct DirManagerOptions {
// Must not be empty.
std::string dir_type;
+ // The id used to distinguish different tenants.
+ //
+ // Must not be empty.
+ std::string tenant_id;
+
// The entity under which all metrics should be grouped. If null, metrics
// will not be produced.
//
@@ -205,7 +210,8 @@ struct DirManagerOptions {
UpdateInstanceBehavior update_instances;
protected:
- explicit DirManagerOptions(const std::string& dir_type);
+ explicit DirManagerOptions(std::string dir_type,
+ std::string tid);
};
class DirManager {
@@ -287,6 +293,8 @@ class DirManager {
// the given UUID index.
std::set<std::string> FindTabletsByDirUuidIdx(int uuid_idx) const;
+ const std::string tenant_id() { return opts_.tenant_id; }
+
// Create a new directory using the appropriate directory implementation.
virtual std::unique_ptr<Dir> CreateNewDir(Env* env,
DirMetrics* metrics,
diff --git a/src/kudu/fs/error_manager-test.cc
b/src/kudu/fs/error_manager-test.cc
index 538d3dbe4..26e6c4b69 100644
--- a/src/kudu/fs/error_manager-test.cc
+++ b/src/kudu/fs/error_manager-test.cc
@@ -31,6 +31,7 @@
#include <gtest/gtest.h>
#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/threading/thread_collision_warner.h"
#include "kudu/util/monotime.h"
@@ -86,7 +87,7 @@ class FsErrorManagerTest : public KuduTest {
// time, it is likely that some will write to the same entry.
//
// NOTE: this can be curried into an ErrorNotificationCb.
- void SleepAndWriteFirstEmptyCb(int i, const string& /* s */) {
+ void SleepAndWriteFirstEmptyCb(int i, const string& /* s */, const string&
/* u */) {
DFAKE_SCOPED_LOCK(fake_lock_);
int first_available = FindFirst(-1);
SleepForRand();
@@ -107,14 +108,14 @@ class FsErrorManagerTest : public KuduTest {
return positions;
}
- FsErrorManager* em() const { return em_.get(); }
+ scoped_refptr<FsErrorManager> em() const { return em_; }
protected:
// The single vector that the error notification callbacks will all write to.
vector<int> test_vec_;
private:
- unique_ptr<FsErrorManager> em_;
+ scoped_refptr<FsErrorManager> em_;
// Fake lock used to ensure threads don't run error-handling at the same
time.
DFAKE_MUTEX(fake_lock_);
@@ -130,8 +131,8 @@ TEST_F(FsErrorManagerTest, TestBasicRegistration) {
// Register a callback to update the first '-1' entry in test_vec_ to '0'
// after waiting a random amount of time.
em()->SetErrorNotificationCb(
- ErrorHandlerType::DISK_ERROR, [this](const string& uuid) {
- this->SleepAndWriteFirstEmptyCb(ErrorHandlerType::DISK_ERROR, uuid);
+ ErrorHandlerType::DISK_ERROR, [this](const string& uuid, const string&
tenant_id) {
+ this->SleepAndWriteFirstEmptyCb(ErrorHandlerType::DISK_ERROR, uuid,
tenant_id);
});
em()->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, "");
ASSERT_EQ(0, FindFirst(ErrorHandlerType::DISK_ERROR));
@@ -143,8 +144,8 @@ TEST_F(FsErrorManagerTest, TestBasicRegistration) {
// Now register another callback.
em()->SetErrorNotificationCb(
- ErrorHandlerType::NO_AVAILABLE_DISKS, [this](const string& uuid) {
- this->SleepAndWriteFirstEmptyCb(ErrorHandlerType::NO_AVAILABLE_DISKS,
uuid);
+ ErrorHandlerType::NO_AVAILABLE_DISKS, [this](const string& uuid, const
string& tenant_id) {
+ this->SleepAndWriteFirstEmptyCb(ErrorHandlerType::NO_AVAILABLE_DISKS,
uuid, tenant_id);
});
em()->RunErrorNotificationCb(ErrorHandlerType::NO_AVAILABLE_DISKS, "");
ASSERT_EQ(1, FindFirst(ErrorHandlerType::NO_AVAILABLE_DISKS));
@@ -165,12 +166,12 @@ TEST_F(FsErrorManagerTest, TestBasicRegistration) {
// Test that the callbacks get run serially.
TEST_F(FsErrorManagerTest, TestSerialization) {
em()->SetErrorNotificationCb(
- ErrorHandlerType::DISK_ERROR, [this](const string& uuid) {
- this->SleepAndWriteFirstEmptyCb(ErrorHandlerType::DISK_ERROR, uuid);
+ ErrorHandlerType::DISK_ERROR, [this](const string& uuid, const string&
tenant_id) {
+ this->SleepAndWriteFirstEmptyCb(ErrorHandlerType::DISK_ERROR, uuid,
tenant_id);
});
em()->SetErrorNotificationCb(
- ErrorHandlerType::NO_AVAILABLE_DISKS, [this](const string& uuid) {
- this->SleepAndWriteFirstEmptyCb(ErrorHandlerType::NO_AVAILABLE_DISKS,
uuid);
+ ErrorHandlerType::NO_AVAILABLE_DISKS, [this](const string& uuid, const
string& tenant_id) {
+ this->SleepAndWriteFirstEmptyCb(ErrorHandlerType::NO_AVAILABLE_DISKS,
uuid, tenant_id);
});
// Swap back and forth between error-handler type.
diff --git a/src/kudu/fs/error_manager.cc b/src/kudu/fs/error_manager.cc
index 55b78dbdc..b789f8154 100644
--- a/src/kudu/fs/error_manager.cc
+++ b/src/kudu/fs/error_manager.cc
@@ -29,7 +29,8 @@ namespace kudu {
namespace fs {
// Default error-handling callback that no-ops.
-static void DoNothingErrorNotification(const string& /* uuid */) {}
+static void DoNothingErrorNotification(const string& /* uuid */,
+ const string& /* tenant_id */) {}
FsErrorManager::FsErrorManager() {
InsertOrDie(&callbacks_, ErrorHandlerType::DISK_ERROR,
&DoNothingErrorNotification);
@@ -47,9 +48,11 @@ void
FsErrorManager::UnsetErrorNotificationCb(ErrorHandlerType e) {
EmplaceOrUpdate(&callbacks_, e, &DoNothingErrorNotification);
}
-void FsErrorManager::RunErrorNotificationCb(ErrorHandlerType e, const string&
uuid) const {
+void FsErrorManager::RunErrorNotificationCb(ErrorHandlerType e,
+ const string& uuid,
+ const string& tenant_id) const {
std::lock_guard<Mutex> l(lock_);
- FindOrDie(callbacks_, e)(uuid);
+ FindOrDie(callbacks_, e)(uuid, tenant_id);
}
} // namespace fs
diff --git a/src/kudu/fs/error_manager.h b/src/kudu/fs/error_manager.h
index 8a32d5714..8c89195a0 100644
--- a/src/kudu/fs/error_manager.h
+++ b/src/kudu/fs/error_manager.h
@@ -26,17 +26,18 @@
#include "kudu/fs/dir_manager.h"
#include "kudu/fs/dir_util.h"
#include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
#include "kudu/util/mutex.h"
namespace kudu {
namespace fs {
// Callback to error-handling code. The input string is the UUID a failed
-// component.
+// component and the ID of the corresponding tenant.
//
// e.g. the ErrorNotificationCb for disk failure handling takes the UUID of a
// directory, marks it failed, and shuts down the tablets in that directory.
-typedef std::function<void(const std::string&)> ErrorNotificationCb;
+typedef std::function<void(const std::string&, const std::string&)>
ErrorNotificationCb;
// Evaluates the expression and handles it if it results in an error.
// Returns if the status is an error.
@@ -129,7 +130,7 @@ enum ErrorHandlerType {
// e.g. the TSTabletManager registers a callback to handle disk failure.
// Blocks and other entities that may hit disk failures can call it without
// knowing about the TSTabletManager.
-class FsErrorManager {
+class FsErrorManager : public RefCountedThreadSafe<FsErrorManager> {
public:
FsErrorManager();
@@ -146,15 +147,28 @@ class FsErrorManager {
// Runs the error notification callback.
//
// 'uuid' is the full UUID of the component that failed.
- void RunErrorNotificationCb(ErrorHandlerType e, const std::string& uuid)
const;
+ // 'tenant_id' is used to indicate the corresponding tenant, if not
specified,
+ // we will treat it as the default tenant.
+ void RunErrorNotificationCb(
+ ErrorHandlerType e,
+ const std::string& uuid,
+ const std::string& tenant_id = fs::kDefaultTenantID) const;
// Runs the error notification callback with the UUID of 'dir'.
- void RunErrorNotificationCb(ErrorHandlerType e, const Dir* dir) const {
+ //
+ // 'tenant_id' is used to indicate the corresponding tenant, if not
specified,
+ // we will treat it as the default tenant.
+ void RunErrorNotificationCb(ErrorHandlerType e,
+ const Dir* dir,
+ const std::string& tenant_id =
fs::kDefaultTenantID) const {
DCHECK_EQ(e, ErrorHandlerType::DISK_ERROR);
- RunErrorNotificationCb(e, dir->instance()->uuid());
+ RunErrorNotificationCb(e, dir->instance()->uuid(), tenant_id);
}
private:
+ friend class RefCountedThreadSafe<FsErrorManager>;
+ ~FsErrorManager() {}
+
// Callbacks to be run when an error occurs.
std::unordered_map<ErrorHandlerType, ErrorNotificationCb, std::hash<int>>
callbacks_;
diff --git a/src/kudu/fs/file_block_manager.cc
b/src/kudu/fs/file_block_manager.cc
index def8b9f0a..e8524fd94 100644
--- a/src/kudu/fs/file_block_manager.cc
+++ b/src/kudu/fs/file_block_manager.cc
@@ -96,7 +96,7 @@ namespace internal {
class FileBlockLocation {
public:
// Empty constructor
- FileBlockLocation() {
+ FileBlockLocation() : data_dir_(nullptr) {
}
// Construct a location from its constituent parts.
@@ -260,6 +260,8 @@ class FileWritableBlock final : public WritableBlock {
void HandleError(const Status& s) const;
+ string tenant_id() const { return block_manager_->tenant_id(); }
+
// Starts an asynchronous flush of dirty block data to disk.
Status FlushDataAsync();
@@ -315,7 +317,7 @@ FileWritableBlock::~FileWritableBlock() {
void FileWritableBlock::HandleError(const Status& s) const {
HANDLE_DISK_FAILURE(
s, block_manager_->error_manager()->RunErrorNotificationCb(
- ErrorHandlerType::DISK_ERROR, location_.data_dir()));
+ ErrorHandlerType::DISK_ERROR, location_.data_dir(), tenant_id()));
}
Status FileWritableBlock::Close() {
@@ -476,7 +478,7 @@ void FileReadableBlock::HandleError(const Status& s) const {
const Dir* dir = block_manager_->dd_manager_->FindDirByUuidIndex(
internal::FileBlockLocation::GetDirIdx(block_id_));
HANDLE_DISK_FAILURE(s,
block_manager_->error_manager()->RunErrorNotificationCb(
- ErrorHandlerType::DISK_ERROR, dir));
+ ErrorHandlerType::DISK_ERROR, dir,
block_manager_->dd_manager_->tenant_id()));
}
FileReadableBlock::FileReadableBlock(FileBlockManager* block_manager,
@@ -684,7 +686,8 @@ Status FileBlockManager::SyncMetadata(const
internal::FileBlockLocation& locatio
if (metrics_) metrics_->total_disk_sync->Increment();
RETURN_NOT_OK_HANDLE_DISK_FAILURE(env_->SyncDir(s),
error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
- location.data_dir()));
+ location.data_dir(),
+ tenant_id()));
}
}
return Status::OK();
@@ -702,20 +705,22 @@ bool FileBlockManager::FindBlockPath(const BlockId&
block_id,
}
FileBlockManager::FileBlockManager(Env* env,
- DataDirManager* dd_manager,
- FsErrorManager* error_manager,
+ scoped_refptr<DataDirManager> dd_manager,
+ scoped_refptr<FsErrorManager> error_manager,
FileCache* file_cache,
- BlockManagerOptions opts)
+ BlockManagerOptions opts,
+ string tenant_id)
: env_(DCHECK_NOTNULL(env)),
- dd_manager_(dd_manager),
- error_manager_(DCHECK_NOTNULL(error_manager)),
+ dd_manager_(std::move(dd_manager)),
+ error_manager_(DCHECK_NOTNULL(std::move(error_manager))),
opts_(std::move(opts)),
file_cache_(file_cache),
rand_(GetRandomSeed32()),
next_block_id_(rand_.Next64()),
mem_tracker_(MemTracker::CreateTracker(-1,
"file_block_manager",
- opts_.parent_mem_tracker)) {
+ opts_.parent_mem_tracker)),
+ tenant_id_(std::move(tenant_id)) {
if (opts_.metric_entity) {
metrics_.reset(new internal::BlockManagerMetrics(opts_.metric_entity));
}
@@ -724,8 +729,9 @@ FileBlockManager::FileBlockManager(Env* env,
FileBlockManager::~FileBlockManager() {
}
-Status FileBlockManager::Open(FsReport* report, std::atomic<int>*
containers_processed,
- std::atomic<int>* containers_total) {
+Status FileBlockManager::Open(FsReport* report, MergeReport need_merage,
+ std::atomic<int>* /* containers_processed */,
+ std::atomic<int>* /* containers_total */) {
// Prepare the filesystem report and either return or log it.
FsReport local_report;
set<int> failed_dirs = dd_manager_->GetFailedDirs();
@@ -745,7 +751,11 @@ Status FileBlockManager::Open(FsReport* report,
std::atomic<int>* containers_pro
local_report.data_dirs.push_back(dd->dir());
}
if (report) {
- *report = std::move(local_report);
+ if (need_merage == MergeReport::REQUIRED) {
+ report->MergeFrom(local_report);
+ } else {
+ *report = std::move(local_report);
+ }
} else {
RETURN_NOT_OK(local_report.LogAndCheckForFatalErrors());
}
@@ -758,7 +768,9 @@ Status FileBlockManager::CreateBlock(const
CreateBlockOptions& opts,
Dir* dir;
RETURN_NOT_OK_EVAL(dd_manager_->GetDirAddIfNecessary(opts, &dir),
-
error_manager_->RunErrorNotificationCb(ErrorHandlerType::NO_AVAILABLE_DISKS,
opts.tablet_id));
+
error_manager_->RunErrorNotificationCb(ErrorHandlerType::NO_AVAILABLE_DISKS,
+ opts.tablet_id,
+ tenant_id()));
int uuid_idx;
CHECK(dd_manager_->FindUuidIndexByDir(dir, &uuid_idx));
@@ -794,7 +806,8 @@ Status FileBlockManager::CreateBlock(const
CreateBlockOptions& opts,
// no point in doing so. On disk failure, the tablet specified by 'opts'
// will be shut down, so the returned block would not be used.
RETURN_NOT_OK_HANDLE_DISK_FAILURE(s,
- error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
dir));
+ error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
+ dir, tenant_id()));
WritableFileOptions wr_opts;
wr_opts.mode = Env::MUST_CREATE;
wr_opts.is_sensitive = true;
@@ -815,7 +828,8 @@ Status FileBlockManager::CreateBlock(const
CreateBlockOptions& opts,
block->reset(new internal::FileWritableBlock(this, location, writer));
} else {
HANDLE_DISK_FAILURE(s,
- error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
dir));
+ error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
+ dir, tenant_id()));
return s;
}
return Status::OK();
@@ -824,8 +838,9 @@ Status FileBlockManager::CreateBlock(const
CreateBlockOptions& opts,
#define RETURN_NOT_OK_FBM_DISK_FAILURE(status_expr) do { \
RETURN_NOT_OK_HANDLE_DISK_FAILURE((status_expr), \
error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, \
- dd_manager_->FindDirByUuidIndex( \
- internal::FileBlockLocation::GetDirIdx(block_id)))); \
+ dd_manager_->FindDirByUuidIndex( \
+ internal::FileBlockLocation::GetDirIdx(block_id)), \
+ tenant_id())); \
} while (0)
Status FileBlockManager::OpenBlock(const BlockId& block_id,
diff --git a/src/kudu/fs/file_block_manager.h b/src/kudu/fs/file_block_manager.h
index 01ba46e77..d770e2386 100644
--- a/src/kudu/fs/file_block_manager.h
+++ b/src/kudu/fs/file_block_manager.h
@@ -26,7 +26,9 @@
#include <vector>
#include "kudu/fs/block_manager.h"
+#include "kudu/fs/error_manager.h"
#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
#include "kudu/util/atomic.h"
#include "kudu/util/locks.h"
#include "kudu/util/random.h"
@@ -41,7 +43,6 @@ class MemTracker;
namespace fs {
class DataDirManager;
-class FsErrorManager;
struct FsReport;
namespace internal {
@@ -73,14 +74,16 @@ class FileBlockManager : public BlockManager {
// Note: all objects passed as pointers should remain alive for the lifetime
// of the block manager.
FileBlockManager(Env* env,
- DataDirManager* dd_manager,
- FsErrorManager* error_manager,
+ scoped_refptr<DataDirManager> dd_manager,
+ scoped_refptr<FsErrorManager> error_manager,
FileCache* file_cache,
- BlockManagerOptions opts);
+ BlockManagerOptions opts,
+ std::string tenant_id);
~FileBlockManager() override;
- Status Open(FsReport* report, std::atomic<int>* containers_processed,
+ Status Open(FsReport* report, MergeReport need_merage,
+ std::atomic<int>* containers_processed,
std::atomic<int>* containers_total) override;
Status CreateBlock(const CreateBlockOptions& opts,
@@ -97,7 +100,9 @@ class FileBlockManager : public BlockManager {
void NotifyBlockId(BlockId block_id) override;
- FsErrorManager* error_manager() override { return error_manager_; }
+ scoped_refptr<FsErrorManager> error_manager() override { return
error_manager_; }
+
+ std::string tenant_id() const override { return tenant_id_; }
private:
friend class internal::FileBlockDeletionTransaction;
@@ -128,10 +133,10 @@ class FileBlockManager : public BlockManager {
// Manages and owns the data directories in which the block manager will
// place its blocks.
- DataDirManager* dd_manager_;
+ scoped_refptr<DataDirManager> dd_manager_;
// Manages callbacks used to handle disk failure.
- FsErrorManager* error_manager_;
+ scoped_refptr<FsErrorManager> error_manager_;
// The options that the FileBlockManager was created with.
const BlockManagerOptions opts_;
@@ -158,6 +163,9 @@ class FileBlockManager : public BlockManager {
// interesting.
std::shared_ptr<MemTracker> mem_tracker_;
+ // Which tenant this file block manager belongs to.
+ std::string tenant_id_;
+
DISALLOW_COPY_AND_ASSIGN(FileBlockManager);
};
diff --git a/src/kudu/fs/fs_manager-test.cc b/src/kudu/fs/fs_manager-test.cc
index 2c380bedb..63923bfa0 100644
--- a/src/kudu/fs/fs_manager-test.cc
+++ b/src/kudu/fs/fs_manager-test.cc
@@ -26,6 +26,7 @@
#include <iostream>
#include <iterator>
#include <memory>
+#include <optional>
#include <set>
#include <string>
#include <tuple>
@@ -41,11 +42,13 @@
#include "kudu/fs/block_manager.h"
#include "kudu/fs/data_dirs.h"
+#include "kudu/fs/default_key_provider.h"
#include "kudu/fs/dir_manager.h"
#include "kudu/fs/dir_util.h"
#include "kudu/fs/fs.pb.h"
#include "kudu/fs/fs_report.h"
#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
@@ -64,6 +67,7 @@
using kudu::pb_util::ReadPBContainerFromPath;
using kudu::pb_util::SecureDebugString;
+using std::nullopt;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
@@ -84,8 +88,27 @@ DECLARE_bool(enable_multi_tenancy);
namespace kudu {
namespace fs {
+static constexpr const char* const kTenantSelectors[] = {
+ "00000000000000000000000000000000", // "default_tenant_kudu"
+ "00000000000000000000000000000001", // "test_tenant_kudu"
+};
+
+static constexpr const char* const kEncryptionType[] = {
+ "kNonEncryption", // FLAGS_encrypt_data_at_rest = false
+ // FLAGS_enable_multi_tenancy = false
+ "kServerEncryption", // FLAGS_encrypt_data_at_rest = true
+ // FLAGS_enable_multi_tenancy = false
+ "kMultiTenantEncryption", // FLAGS_encrypt_data_at_rest = true
+ // FLAGS_enable_multi_tenancy = true
+};
+
+static constexpr const char* const kTestTenantName = "test_tenant_kudu";
+static constexpr const char* const kTestTenantKey =
"00010203040506070809101112131442";
+static constexpr const char* const kTestTenantKeyIv =
"42141312111009080706050403020100";
+static constexpr const char* const kTestTenantKeyVersion = "kudutenantkey@0";
+
class FsManagerTestBase : public KuduTest,
- public testing::WithParamInterface<string> {
+ public
testing::WithParamInterface<std::tuple<string, const char*>> {
public:
FsManagerTestBase()
: fs_root_(GetTestPath("fs_root")) {
@@ -93,12 +116,58 @@ class FsManagerTestBase : public KuduTest,
void SetUp() override {
KuduTest::SetUp();
- FLAGS_block_manager = GetParam();
+ FLAGS_block_manager = std::get<0>(GetParam());
+ auto encryption_type = std::get<1>(GetParam());
+ if (encryption_type == kEncryptionType[0]) {
+ tenant_id_ = kTenantSelectors[0];
+ FLAGS_encrypt_data_at_rest = false;
+ FLAGS_enable_multi_tenancy = false;
+ } else if (encryption_type == kEncryptionType[1]) {
+ tenant_id_ = kTenantSelectors[0];
+ FLAGS_encrypt_data_at_rest = true;
+ FLAGS_enable_multi_tenancy = false;
+ } else if (encryption_type == kEncryptionType[2]) {
+ tenant_id_ = kTenantSelectors[1];
+ FLAGS_encrypt_data_at_rest = true;
+ FLAGS_enable_multi_tenancy = true;
+ } else {
+ ASSERT_TRUE(0);
+ }
// Initialize File-System Layout
ReinitFsManager();
ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
ASSERT_OK(fs_manager_->Open());
+ if (tenant_id() != kTenantSelectors[0]) {
+ // Init tenant for non default tenant.
+ ASSERT_OK(fs_manager_->AddTenant(kTestTenantName,
+ tenant_id(),
+ kTestTenantKey,
+ kTestTenantKeyIv,
+ kTestTenantKeyVersion));
+ }
+ }
+
+ Env* GetEnv() const {
+ // TODO(kedeng):
+ // Different tenants should use their own environments, but currently
+ // the data loading patches for multi-tenants have not been merged, which
+ // results in ignoring tenant information when re-opening the FS manager.
+ // This method is temporarily used to ensure that the single test can run
+ // successfully, and the implementation here will need to be modified in
+ // the future.
+ return fs_manager()->GetEnv(kTenantSelectors[0]);
+ }
+
+ void AddNonDefaultTanent() {
+ if (tenant_id() != kTenantSelectors[0]) {
+ // Init tenant for non default tenant.
+ ASSERT_OK(fs_manager_->AddTenant(kTestTenantName,
+ tenant_id(),
+ kTestTenantKey,
+ kTestTenantKeyIv,
+ kTestTenantKeyVersion));
+ }
}
void ReinitFsManager() {
@@ -122,36 +191,141 @@ class FsManagerTestBase : public KuduTest,
// Test Write
unique_ptr<WritableBlock> writer;
- ASSERT_OK(fs_manager()->CreateNewBlock({}, &writer));
+ ASSERT_OK(fs_manager()->CreateNewBlock({}, &writer, tenant_id()));
ASSERT_OK(writer->Append(data));
ASSERT_OK(writer->Close());
// Test Read
Slice result(buffer, data.size());
unique_ptr<ReadableBlock> reader;
- ASSERT_OK(fs_manager()->OpenBlock(writer->id(), &reader));
+ ASSERT_OK(fs_manager()->OpenBlock(writer->id(), &reader, tenant_id()));
ASSERT_OK(reader->Read(0, result));
ASSERT_EQ(data, result);
}
FsManager *fs_manager() const { return fs_manager_.get(); }
+ const string& tenant_id() const { return tenant_id_; }
+
protected:
const string fs_root_;
private:
unique_ptr<FsManager> fs_manager_;
+ string tenant_id_;
};
+
INSTANTIATE_TEST_SUITE_P(BlockManagerTypes, FsManagerTestBase,
-
::testing::ValuesIn(BlockManager::block_manager_types()));
+ ::testing::Combine(
+ ::testing::ValuesIn(BlockManager::block_manager_types()),
+ ::testing::ValuesIn(kEncryptionType)));
TEST_P(FsManagerTestBase, TestBaseOperations) {
- fs_manager()->DumpFileSystemTree(std::cout);
+ fs_manager()->DumpFileSystemTree(std::cout, tenant_id());
TestReadWriteDataFile(Slice("test0"));
TestReadWriteDataFile(Slice("test1"));
- fs_manager()->DumpFileSystemTree(std::cout);
+ fs_manager()->DumpFileSystemTree(std::cout, tenant_id());
+}
+
+TEST_P(FsManagerTestBase, TestTenantAccountOperation) {
+ int tenant_num = fs_manager()->tenants_count();
+ const string non_exist_tenant_name = "non_exist_tenant_name";
+ const string non_exist_tenant = "10000000000000000000000000000000";
+ const string default_tenant_id = "00000000000000000000000000000000";
+ auto encryption_type = std::get<1>(GetParam());
+ if (encryption_type != kEncryptionType[2]) {
+ if (encryption_type == kEncryptionType[0]) {
+ // Multi-tenancy is disabled.
+ ASSERT_FALSE(FLAGS_enable_multi_tenancy);
+ ASSERT_FALSE(FLAGS_encrypt_data_at_rest);
+ } else if (encryption_type == kEncryptionType[1]) {
+ // Multi-tenancy is disabled but data at rest encryption is enabled.
+ ASSERT_FALSE(FLAGS_enable_multi_tenancy);
+ ASSERT_TRUE(FLAGS_encrypt_data_at_rest);
+ }
+ ASSERT_EQ(0, tenant_num);
+ ASSERT_FALSE(fs_manager()->is_tenants_exist());
+
+ // Add tenant is not allowed.
+ ASSERT_DEATH(fs_manager()->AddTenant(non_exist_tenant_name,
non_exist_tenant,
+ nullopt, nullopt, nullopt), "");
+ ASSERT_FALSE(fs_manager()->VertifyTenant(non_exist_tenant));
+ ASSERT_TRUE(fs_manager()->VertifyTenant(default_tenant_id));
+ ASSERT_EQ(0, fs_manager()->GetDataRootDirs(non_exist_tenant).size());
+ ASSERT_NE(0, fs_manager()->GetDataRootDirs(default_tenant_id).size());
+ ASSERT_FALSE(fs_manager()->block_manager(non_exist_tenant));
+ ASSERT_TRUE(fs_manager()->block_manager(default_tenant_id));
+
+ // Remove tenant is not allowed.
+ const auto s = fs_manager()->RemoveTenant(non_exist_tenant);
+ ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ Substitute("Not support for removing tenant for id: $0.",
non_exist_tenant));
+ } else {
+ // Multi-tenancy is enabled.
+ ASSERT_TRUE(FLAGS_enable_multi_tenancy);
+ ASSERT_TRUE(FLAGS_encrypt_data_at_rest);
+ ASSERT_EQ(2, tenant_num);
+ ASSERT_TRUE(fs_manager()->is_tenants_exist());
+ for (const auto& tenant : kTenantSelectors) {
+ ASSERT_TRUE(fs_manager()->is_tenant_exist(tenant));
+ ASSERT_TRUE(fs_manager()->VertifyTenant(tenant));
+ // Re-add a tenant which was already exist will fail.
+ string new_tenant = "new_tenant_name";
+ const auto s = fs_manager()->AddTenant(new_tenant, tenant, nullopt,
+ nullopt, nullopt);
+ ASSERT_TRUE(s.IsAlreadyPresent()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(),
+ Substitute("Tenant $0 already exists.", tenant));
+ }
+
+ ASSERT_FALSE(fs_manager()->is_tenant_exist(non_exist_tenant));
+
+ // Test add tenant.
+ string new_tenant_name = "new_tenant_name";
+ string new_tenant = "00000000000000000000000000000011";
+ // Make sure the new tenant does not exist.
+ ASSERT_FALSE(fs_manager()->is_tenant_exist(new_tenant));
+ // Generate key info to do tenant init.
+ security::DefaultKeyProvider key_provider;
+ string encrypted_key;
+ string iv;
+ string version;
+ ASSERT_OK(key_provider.GenerateEncryptionKey(&encrypted_key, &iv,
&version));
+ ASSERT_OK(fs_manager()->AddTenant(new_tenant_name, new_tenant,
+ encrypted_key, iv, version));
+
+ // The key info we get need equal to what we set.
+ ASSERT_EQ(new_tenant_name, fs_manager()->tenant_name(new_tenant));
+ ASSERT_EQ(encrypted_key, fs_manager()->tenant_key(new_tenant));
+ ASSERT_EQ(iv, fs_manager()->tenant_key_iv(new_tenant));
+ ASSERT_EQ(version, fs_manager()->tenant_key_version(new_tenant));
+
+ // The new tenant need exist after 'AddTenant'.
+ ASSERT_TRUE(fs_manager()->is_tenant_exist(new_tenant));
+ ASSERT_EQ(3, fs_manager()->tenants_count());
+ for (auto& tenant : fs_manager()->GetAllTenants()) {
+ ASSERT_TRUE(fs_manager()->is_tenant_exist(tenant));
+ ASSERT_TRUE(fs_manager()->VertifyTenant(tenant));
+ ASSERT_NE(0, fs_manager()->GetDataRootDirs(tenant).size());
+ }
+
+ // Test remove tenant.
+ ASSERT_TRUE(fs_manager()->is_tenant_exist(new_tenant));
+ ASSERT_OK(fs_manager()->RemoveTenant(new_tenant));
+ ASSERT_FALSE(fs_manager()->is_tenant_exist(new_tenant));
+ ASSERT_EQ(2, fs_manager()->tenants_count());
+ for (auto& tenant : fs_manager()->GetAllTenants()) {
+ ASSERT_TRUE(fs_manager()->is_tenant_exist(tenant));
+ }
+
+ // Remove default tenant is not allowed.
+ const auto s = fs_manager()->RemoveTenant(default_tenant_id);
+ ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "Remove default tenant is not allowed.");
+ }
}
TEST_P(FsManagerTestBase, TestIllegalPaths) {
@@ -181,8 +355,10 @@ TEST_P(FsManagerTestBase, TestDuplicatePaths) {
string path = GetTestPath("foo");
ReinitFsManagerWithPaths(path, { path, path, path });
ASSERT_OK(fs_manager()->CreateInitialFileSystemLayout());
+ ASSERT_OK(fs_manager()->Open());
+ NO_FATALS(AddNonDefaultTanent());
ASSERT_EQ(vector<string>({ JoinPathSegments(path,
fs_manager()->kDataDirName) }),
- fs_manager()->GetDataRootDirs());
+ fs_manager()->GetDataRootDirs(tenant_id()));
}
TEST_P(FsManagerTestBase, TestListTablets) {
@@ -192,21 +368,21 @@ TEST_P(FsManagerTestBase, TestListTablets) {
string path = fs_manager()->GetTabletMetadataDir();
unique_ptr<WritableFile> writer;
- ASSERT_OK(env_->NewWritableFile(
+ ASSERT_OK(GetEnv()->NewWritableFile(
JoinPathSegments(path, "foo.kudutmp"), &writer));
- ASSERT_OK(env_->NewWritableFile(
+ ASSERT_OK(GetEnv()->NewWritableFile(
JoinPathSegments(path, "foo.kudutmp.abc123"), &writer));
- ASSERT_OK(env_->NewWritableFile(
+ ASSERT_OK(GetEnv()->NewWritableFile(
JoinPathSegments(path, "foo.bak"), &writer));
- ASSERT_OK(env_->NewWritableFile(
+ ASSERT_OK(GetEnv()->NewWritableFile(
JoinPathSegments(path, "foo.bak.abc123"), &writer));
- ASSERT_OK(env_->NewWritableFile(
+ ASSERT_OK(GetEnv()->NewWritableFile(
JoinPathSegments(path, ".hidden"), &writer));
// An uncanonicalized id.
- ASSERT_OK(env_->NewWritableFile(
+ ASSERT_OK(GetEnv()->NewWritableFile(
JoinPathSegments(path, "6ba7b810-9dad-11d1-80b4-00c04fd430c8"),
&writer));
// 1 valid tablet id.
- ASSERT_OK(env_->NewWritableFile(
+ ASSERT_OK(GetEnv()->NewWritableFile(
JoinPathSegments(path, "922ff7ed14c14dbca4ee16331dfda42a"), &writer));
ASSERT_OK(fs_manager()->ListTabletIds(&tablet_ids));
@@ -215,10 +391,10 @@ TEST_P(FsManagerTestBase, TestListTablets) {
TEST_P(FsManagerTestBase, TestCannotUseNonEmptyFsRoot) {
string path = GetTestPath("new_fs_root");
- ASSERT_OK(env_->CreateDir(path));
+ ASSERT_OK(GetEnv()->CreateDir(path));
{
unique_ptr<WritableFile> writer;
- ASSERT_OK(env_->NewWritableFile(
+ ASSERT_OK(GetEnv()->NewWritableFile(
JoinPathSegments(path, "some_file"), &writer));
}
@@ -236,7 +412,7 @@ TEST_P(FsManagerTestBase, TestEmptyWALPath) {
TEST_P(FsManagerTestBase, TestOnlyWALPath) {
string path = GetTestPath("new_fs_root");
- ASSERT_OK(env_->CreateDir(path));
+ ASSERT_OK(GetEnv()->CreateDir(path));
ReinitFsManagerWithPaths(path, {});
ASSERT_OK(fs_manager()->CreateInitialFileSystemLayout());
@@ -332,8 +508,8 @@ TEST_P(FsManagerTestBase, TestMetadataDirInDataRoot) {
Status s = fs_manager()->Open();
ASSERT_STR_CONTAINS(s.ToString(), "could not verify required directory");
ASSERT_TRUE(s.IsNotFound()) << s.ToString();
- ASSERT_FALSE(env_->FileExists(opts.data_roots[0]));
- ASSERT_TRUE(env_->FileExists(opts.data_roots[1]));
+ ASSERT_FALSE(GetEnv()->FileExists(opts.data_roots[0]));
+ ASSERT_TRUE(GetEnv()->FileExists(opts.data_roots[1]));
// Now allow the reordering by specifying the expected metadata root.
opts.metadata_root = opts.data_roots[1];
@@ -414,7 +590,7 @@ TEST_P(FsManagerTestBase, TestCreateWithFailedDirs) {
// Create some top-level paths to place roots in.
vector<string> data_paths = { GetTestPath("data1"), GetTestPath("data2"),
GetTestPath("data3") };
for (const string& path : data_paths) {
- env_->CreateDir(path);
+ GetEnv()->CreateDir(path);
}
// Initialize the FS layout with roots in subdirectories of data_paths. When
// we canonicalize paths, we canonicalize the dirname of each path (e.g.
@@ -440,18 +616,18 @@ TEST_P(FsManagerTestBase,
TestOpenWithDuplicateInstanceFiles) {
WritableFileOptions wr_opts;
wr_opts.mode = Env::MUST_CREATE;
const string duplicate_test_root = GetTestPath("fs_dup");
- ASSERT_OK(env_->CreateDir(duplicate_test_root));
+ ASSERT_OK(GetEnv()->CreateDir(duplicate_test_root));
const string duplicate_instance = JoinPathSegments(
duplicate_test_root, FsManager::kInstanceMetadataFileName);
- ASSERT_OK(env_util::CopyFile(env_,
fs_manager()->GetInstanceMetadataPath(fs_root_),
+ ASSERT_OK(env_util::CopyFile(GetEnv(),
fs_manager()->GetInstanceMetadataPath(fs_root_),
duplicate_instance, wr_opts));
// Make a copy of the per-directory instance file.
const string duplicate_test_dir = JoinPathSegments(duplicate_test_root,
kDataDirName);
- ASSERT_OK(env_->CreateDir(duplicate_test_dir));
+ ASSERT_OK(GetEnv()->CreateDir(duplicate_test_dir));
const string duplicate_dir_instance = JoinPathSegments(
duplicate_test_dir, kInstanceMetadataFileName);
- ASSERT_OK(env_util::CopyFile(env_,
+ ASSERT_OK(env_util::CopyFile(GetEnv(),
fs_manager()->dd_manager()->FindDirByUuidIndex(0)->instance()->path(),
duplicate_dir_instance, wr_opts));
@@ -543,7 +719,7 @@ TEST_P(FsManagerTestBase, TestOpenWithUnhealthyDataDir) {
// empty disk and attempt to use it. Upon opening the FS layout, we should
// see no failed directories.
FLAGS_env_inject_eio = 0;
- ASSERT_OK(env_->DeleteRecursively(new_root));
+ ASSERT_OK(GetEnv()->DeleteRecursively(new_root));
ReinitFsManagerWithOpts(opts);
ASSERT_OK(fs_manager()->Open());
ASSERT_EQ(0, fs_manager()->dd_manager()->GetFailedDirs().size());
@@ -571,7 +747,7 @@ TEST_P(FsManagerTestBase, TestOpenWithUnhealthyDataDir) {
// The above behavior should be seen if the data directories are missing...
FLAGS_env_inject_eio = 0;
for (const auto& root : opts.data_roots) {
- ASSERT_OK(env_->DeleteRecursively(root));
+ ASSERT_OK(GetEnv()->DeleteRecursively(root));
}
ReinitFsManagerWithOpts(opts);
s = fs_manager()->Open();
@@ -591,8 +767,8 @@ TEST_P(FsManagerTestBase,
TestOpenWithCanonicalizationFailure) {
// Create some parent directories and subdirectories.
const string dir1 = GetTestPath("test1");
const string dir2 = GetTestPath("test2");
- ASSERT_OK(env_->CreateDir(dir1));
- ASSERT_OK(env_->CreateDir(dir2));
+ ASSERT_OK(GetEnv()->CreateDir(dir1));
+ ASSERT_OK(GetEnv()->CreateDir(dir2));
const string subdir1 = GetTestPath("test1/subdir");
const string subdir2 = GetTestPath("test2/subdir");
FsManagerOpts opts;
@@ -611,7 +787,7 @@ TEST_P(FsManagerTestBase,
TestOpenWithCanonicalizationFailure) {
// Now fail the canonicalization by deleting a parent directory. This
// simulates the mountpoint disappearing.
- ASSERT_OK(env_->DeleteRecursively(dir2));
+ ASSERT_OK(GetEnv()->DeleteRecursively(dir2));
ReinitFsManagerWithOpts(opts);
Status s = fs_manager()->Open();
ASSERT_OK(s);
@@ -622,7 +798,7 @@ TEST_P(FsManagerTestBase,
TestOpenWithCanonicalizationFailure) {
}
// Let's try that again, but with the appropriate mountpoint/directory.
- ASSERT_OK(env_->CreateDir(dir2));
+ ASSERT_OK(GetEnv()->CreateDir(dir2));
ReinitFsManagerWithOpts(opts);
ASSERT_OK(fs_manager()->Open());
ASSERT_EQ(0, fs_manager()->dd_manager()->GetFailedDirs().size());
@@ -740,14 +916,14 @@ TEST_P(FsManagerTestBase, TestUmask) {
TEST_P(FsManagerTestBase, TestOpenFailsWhenMissingImportantDir) {
const string kWalRoot = fs_manager()->GetWalsRootDir();
- ASSERT_OK(env_->DeleteDir(kWalRoot));
+ ASSERT_OK(GetEnv()->DeleteDir(kWalRoot));
ReinitFsManager();
Status s = fs_manager()->Open();
ASSERT_TRUE(s.IsNotFound()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "could not verify required directory");
unique_ptr<WritableFile> f;
- ASSERT_OK(env_->NewWritableFile(kWalRoot, &f));
+ ASSERT_OK(GetEnv()->NewWritableFile(kWalRoot, &f));
s = fs_manager()->Open();
ASSERT_TRUE(s.IsCorruption()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "exists but is not a directory");
@@ -807,7 +983,7 @@ TEST_P(FsManagerTestBase, TestEIOWhileChangingDirs) {
for (int i = 0; i < kMaxDirs; i++) {
const string dir = Substitute("$0$1", kTestPathBase, i);
all_dirs.emplace_back(dir);
- ASSERT_OK(env_->CreateDir(dir));
+ ASSERT_OK(GetEnv()->CreateDir(dir));
}
FsManagerOpts opts;
opts.wal_root = all_dirs[0];
@@ -840,7 +1016,7 @@ TEST_P(FsManagerTestBase,
TestEIOWhileRunningUpdateDirsTool) {
// Helper to get a new root.
auto create_root = [&] (int i) {
const string dir = Substitute("$0$1", kTestPathBase, i);
- CHECK_OK(env_->CreateDir(dir));
+ CHECK_OK(GetEnv()->CreateDir(dir));
return dir;
};
@@ -860,7 +1036,7 @@ TEST_P(FsManagerTestBase,
TestEIOWhileRunningUpdateDirsTool) {
// Collect the contents of the InstanceMetadataPB objects.
const auto instance_path = JoinPathSegments(root,
FsManager::kInstanceMetadataFileName);
unique_ptr<InstanceMetadataPB> pb(new InstanceMetadataPB);
- Status s = ReadPBContainerFromPath(env_, instance_path, pb.get(),
pb_util::NOT_SENSITIVE);
+ Status s = ReadPBContainerFromPath(GetEnv(), instance_path, pb.get(),
pb_util::NOT_SENSITIVE);
if (s.IsNotFound()) {
InsertOrDie(&instances, instance_path, "");
} else {
@@ -872,7 +1048,7 @@ TEST_P(FsManagerTestBase,
TestEIOWhileRunningUpdateDirsTool) {
unique_ptr<DirInstanceMetadataPB> bmi_pb(new DirInstanceMetadataPB);
const auto block_manager_instance = JoinPathSegments(
JoinPathSegments(root, kDataDirName), kInstanceMetadataFileName);
- s = ReadPBContainerFromPath(env_, block_manager_instance,
+ s = ReadPBContainerFromPath(GetEnv(), block_manager_instance,
bmi_pb.get(), pb_util::NOT_SENSITIVE);
if (s.IsNotFound()) {
InsertOrDie(&instances, block_manager_instance, "");
@@ -939,7 +1115,7 @@ TEST_P(FsManagerTestBase, TestReAddRemovedDataDir) {
opts.data_roots = data_roots;
ReinitFsManagerWithOpts(opts);
ASSERT_OK(fs_manager()->Open());
- DataDirManager* dd_manager = fs_manager()->dd_manager();
+ auto dd_manager = fs_manager()->dd_manager();
ASSERT_EQ(data_roots.size(), dd_manager->GetDirs().size());
// Since we haven't deleted any directories or instance files, ensure that
@@ -967,8 +1143,8 @@ TEST_P(FsManagerTestBase,
TestCannotRemoveDataDirServingAsMetadataDir) {
// Create a new fs layout with a metadata root explicitly set to the first
// data root.
- ASSERT_OK(env_->DeleteRecursively(fs_root_));
- ASSERT_OK(env_->CreateDir(fs_root_));
+ ASSERT_OK(GetEnv()->DeleteRecursively(fs_root_));
+ ASSERT_OK(GetEnv()->CreateDir(fs_root_));
FsManagerOpts opts;
opts.data_roots = { JoinPathSegments(fs_root_, "data1"),
@@ -1127,14 +1303,14 @@ TEST_P(FsManagerTestBase, TestAddRemoveDataDirsFuzz) {
// into the new fs root) then retry.
string source_instance = fs_manager()->GetInstanceMetadataPath(fs_root_);
bool is_dir;
- Status s = env_->IsDirectory(fs_root, &is_dir);
+ Status s = GetEnv()->IsDirectory(fs_root, &is_dir);
if (s.ok()) {
ASSERT_TRUE(is_dir);
string new_instance = fs_manager()->GetInstanceMetadataPath(fs_root);
- if (!env_->FileExists(new_instance)) {
+ if (!GetEnv()->FileExists(new_instance)) {
WritableFileOptions wr_opts;
wr_opts.mode = Env::MUST_CREATE;
- ASSERT_OK(env_util::CopyFile(env_, source_instance, new_instance,
wr_opts));
+ ASSERT_OK(env_util::CopyFile(GetEnv(), source_instance,
new_instance, wr_opts));
ReinitFsManagerWithOpts(fs_opts);
open_status = fs_manager()->Open();
}
@@ -1163,10 +1339,10 @@ TEST_P(FsManagerTestBase, TestAddRemoveDataDirsFuzz) {
string data_dir = JoinPathSegments(root, kDataDirName);
string instance = JoinPathSegments(data_dir,
kInstanceMetadataFileName);
- ASSERT_TRUE(env_->FileExists(instance));
+ ASSERT_TRUE(GetEnv()->FileExists(instance));
string copy = instance + kTmpInfix;
- if (env_->FileExists(copy)) {
- ASSERT_OK(env_->RenameFile(copy, instance));
+ if (GetEnv()->FileExists(copy)) {
+ ASSERT_OK(GetEnv()->RenameFile(copy, instance));
repaired = true;
}
}
diff --git a/src/kudu/fs/fs_manager.cc b/src/kudu/fs/fs_manager.cc
index 4a43c74a2..3017209fd 100644
--- a/src/kudu/fs/fs_manager.cc
+++ b/src/kudu/fs/fs_manager.cc
@@ -165,6 +165,7 @@ DECLARE_bool(enable_multi_tenancy);
DECLARE_bool(encrypt_data_at_rest);
DECLARE_int32(encryption_key_length);
+using kudu::fs::BlockManager;
using kudu::fs::BlockManagerOptions;
using kudu::fs::CreateBlockOptions;
using kudu::fs::DataDirManager;
@@ -243,7 +244,20 @@ FsManager::FsManager(Env* env, FsManagerOpts opts)
}
}
-FsManager::~FsManager() {}
+FsManager::~FsManager() {
+ {
+ std::lock_guard<LockType> lock(ddm_lock_);
+ dd_manager_map_.clear();
+ }
+ {
+ std::lock_guard<LockType> lock(bm_lock_);
+ block_manager_map_.clear();
+ }
+ {
+ std::lock_guard<LockType> lock(env_lock_);
+ env_map_.clear();
+ }
+}
void FsManager::SetErrorNotificationCb(ErrorHandlerType e, ErrorNotificationCb
cb) {
error_manager_->SetErrorNotificationCb(e, std::move(cb));
@@ -375,20 +389,34 @@ Status FsManager::Init() {
return Status::OK();
}
-void FsManager::InitBlockManager() {
+scoped_refptr<BlockManager> FsManager::InitBlockManager(const string&
tenant_id) {
+ auto block_manager = SearchBlockManager(tenant_id);
+ if (block_manager) {
+ return block_manager;
+ }
+
BlockManagerOptions bm_opts;
bm_opts.metric_entity = opts_.metric_entity;
bm_opts.parent_mem_tracker = opts_.parent_mem_tracker;
bm_opts.read_only = opts_.read_only;
if (opts_.block_manager_type == "file") {
- block_manager_.reset(new FileBlockManager(
- GetEnv(), dd_manager_.get(), error_manager_.get(), opts_.file_cache,
std::move(bm_opts)));
+ block_manager.reset(new FileBlockManager(
+ GetEnv(tenant_id), dd_manager(tenant_id), error_manager_,
+ opts_.file_cache, std::move(bm_opts), tenant_id));
} else if (opts_.block_manager_type == "log") {
- block_manager_.reset(new LogBlockManagerNativeMeta(
- GetEnv(), dd_manager_.get(), error_manager_.get(), opts_.file_cache,
std::move(bm_opts)));
+ block_manager.reset(new LogBlockManagerNativeMeta(
+ GetEnv(tenant_id), dd_manager(tenant_id), error_manager_,
+ opts_.file_cache, std::move(bm_opts), tenant_id));
} else {
LOG(FATAL) << "Unknown block_manager_type: " << opts_.block_manager_type;
}
+
+ {
+ std::lock_guard<LockType> lock(bm_lock_);
+ block_manager_map_[tenant_id] = block_manager;
+ }
+
+ return block_manager;
}
Status FsManager::PartialOpen(CanonicalizedRootsList* missing_roots) {
@@ -520,9 +548,9 @@ Status FsManager::Open(FsReport* report, Timer*
read_instance_metadata_files,
//
// The priority of tenant key is higher than that of server key.
RETURN_NOT_OK(
-
key_provider_->DecryptEncryptionKey(this->tenant_key(fs::kDefaultTenantName),
-
this->tenant_key_iv(fs::kDefaultTenantName),
-
this->tenant_key_version(fs::kDefaultTenantName),
+
key_provider_->DecryptEncryptionKey(this->tenant_key(fs::kDefaultTenantID),
+
this->tenant_key_iv(fs::kDefaultTenantID),
+
this->tenant_key_version(fs::kDefaultTenantID),
&decrypted_key));
} else if (!server_key().empty() && key_provider_) {
// Just check whether the upgrade operation is needed for
'--enable_multi_tenancy'.
@@ -547,17 +575,7 @@ Status FsManager::Open(FsReport* report, Timer*
read_instance_metadata_files,
}
// Open the directory manager if it has not been opened already.
- if (!dd_manager_) {
- DataDirManagerOptions dm_opts;
- dm_opts.metric_entity = opts_.metric_entity;
- dm_opts.read_only = opts_.read_only;
- dm_opts.dir_type = opts_.block_manager_type;
- dm_opts.update_instances = opts_.update_instances;
- LOG_TIMING(INFO, "opening directory manager") {
- RETURN_NOT_OK(DataDirManager::OpenExisting(GetEnv(),
- canonicalized_data_fs_roots_, dm_opts, &dd_manager_));
- }
- }
+ RETURN_NOT_OK(OpenDataDirManager());
// Only clean temporary files after the data dir manager successfully opened.
// This ensures that we were able to obtain the exclusive directory locks
@@ -569,8 +587,8 @@ Status FsManager::Open(FsReport* report, Timer*
read_instance_metadata_files,
// Set an initial error handler to mark data directories as failed.
error_manager_->SetErrorNotificationCb(
- ErrorHandlerType::DISK_ERROR, [this](const string& uuid) {
- this->dd_manager_->MarkDirFailedByUuid(uuid);
+ ErrorHandlerType::DISK_ERROR, [this](const string& uuid, const string&
tenant_id) {
+ this->dd_manager(tenant_id)->MarkDirFailedByUuid(uuid);
});
// Finally, initialize and open the block manager if needed.
@@ -578,14 +596,11 @@ Status FsManager::Open(FsReport* report, Timer*
read_instance_metadata_files,
if (read_data_directories) {
read_data_directories->Start();
}
- InitBlockManager();
- LOG_TIMING(INFO, "opening block manager") {
- if (opts_.block_manager_type == "file") {
- RETURN_NOT_OK(block_manager_->Open(report, nullptr, nullptr));
- } else {
- RETURN_NOT_OK(block_manager_->Open(report, containers_processed,
containers_total));
- }
- }
+ RETURN_NOT_OK(InitAndOpenBlockManager(report,
+ containers_processed,
+ containers_total,
+ fs::kDefaultTenantID,
+
BlockManager::MergeReport::REQUIRED));
if (read_data_directories) {
read_data_directories->Stop();
if (opts_.metric_entity && opts_.block_manager_type == "log") {
@@ -625,13 +640,89 @@ Status FsManager::Open(FsReport* report, Timer*
read_instance_metadata_files,
return Status::OK();
}
+Status FsManager::AddDataDirManager(scoped_refptr<DataDirManager> dd_manager,
+ const string& tenant_id) {
+ std::lock_guard<LockType> lock(ddm_lock_);
+ scoped_refptr<DataDirManager> ddm(FindPtrOrNull(dd_manager_map_, tenant_id));
+ if (ddm) {
+ return Status::AlreadyPresent(Substitute("Tenant $0 already exists.",
tenant_id));
+ }
+ dd_manager_map_[tenant_id] = std::move(dd_manager);
+ return Status::OK();
+}
+
+CanonicalizedRootsList FsManager::get_canonicalized_data_fs_roots(const
string& tenant_id) const {
+ if (tenant_id == fs::kDefaultTenantID) {
+ return canonicalized_data_fs_roots_;
+ }
+
+ // TODO(kedeng):
+ // Different tenants should own different data storage paths, and the new
solution
+ // needs to be compatible with existing implementation details.
+ return canonicalized_data_fs_roots_;
+}
+
+Status FsManager::CreateNewDataDirManager(const string& tenant_id) {
+ CHECK(!dd_manager(tenant_id));
+
+ scoped_refptr<DataDirManager> ddm = nullptr;
+ DataDirManagerOptions dm_opts;
+ dm_opts.metric_entity = opts_.metric_entity;
+ dm_opts.read_only = opts_.read_only;
+ dm_opts.dir_type = opts_.block_manager_type;
+ dm_opts.tenant_id = tenant_id;
+ LOG_TIMING(INFO, "creating directory manager") {
+ RETURN_NOT_OK(DataDirManager::CreateNew(GetEnv(tenant_id),
+ get_canonicalized_data_fs_roots(tenant_id), dm_opts, &ddm));
+ }
+ RETURN_NOT_OK(AddDataDirManager(ddm, tenant_id));
+
+ return Status::OK();
+}
+
+Status FsManager::OpenDataDirManager(const string& tenant_id) {
+ if (!dd_manager(tenant_id)) {
+ scoped_refptr<DataDirManager> ddm = nullptr;
+ DataDirManagerOptions dm_opts;
+ dm_opts.metric_entity = opts_.metric_entity;
+ dm_opts.read_only = opts_.read_only;
+ dm_opts.dir_type = opts_.block_manager_type;
+ dm_opts.update_instances = opts_.update_instances;
+ dm_opts.tenant_id = tenant_id;
+ LOG_TIMING(INFO, "opening directory manager") {
+ RETURN_NOT_OK(DataDirManager::OpenExisting(GetEnv(tenant_id),
+ get_canonicalized_data_fs_roots(tenant_id), dm_opts, &ddm));
+ }
+ RETURN_NOT_OK(AddDataDirManager(ddm, tenant_id));
+ }
+ return Status::OK();
+}
+
+Status FsManager::InitAndOpenBlockManager(FsReport* report,
+ std::atomic<int>*
containers_processed,
+ std::atomic<int>* containers_total,
+ const string& tenant_id,
+ BlockManager::MergeReport
need_merage) {
+ auto block_manager = InitBlockManager(tenant_id);
+ DCHECK(block_manager);
+ LOG_TIMING(INFO, "opening block manager") {
+ if (opts_.block_manager_type == "file") {
+ RETURN_NOT_OK(block_manager->Open(report, need_merage, nullptr,
nullptr));
+ } else {
+ RETURN_NOT_OK(block_manager->Open(report, need_merage,
+ containers_processed,
containers_total));
+ }
+ }
+ return Status::OK();
+}
+
void FsManager::CopyMetadata(
unique_ptr<InstanceMetadataPB>* metadata) {
shared_lock<rw_spinlock> md_lock(metadata_rwlock_.get_lock());
(*metadata)->CopyFrom(*metadata_);
}
-Status FsManager::UpdateMetadata(unique_ptr<InstanceMetadataPB> metadata) {
+Status FsManager::UpdateMetadata(unique_ptr<InstanceMetadataPB>& metadata) {
// In the event of failure, rollback everything we changed.
// <string, string> <=> <old instance file, backup instance file>
unordered_map<string, string> changed_dirs;
@@ -691,6 +782,48 @@ void
FsManager::UpdateMetadataFormatAndStampUnlock(InstanceMetadataPB* metadata)
metadata->set_format_stamp(Substitute("Formatted at $0 on $1", time_str,
hostname));
}
+Status FsManager::AddTenantMetadata(const string& tenant_name,
+ const string& tenant_id,
+ const string& tenant_key,
+ const string& tenant_key_iv,
+ const string& tenant_key_version) {
+ unique_ptr<InstanceMetadataPB> metadata(new InstanceMetadataPB);
+ {
+ shared_lock<rw_spinlock> md_lock(metadata_rwlock_.get_lock());
+ metadata->CopyFrom(*metadata_);
+ }
+ InstanceMetadataPB::TenantMetadataPB* tenant_metadata =
metadata->add_tenants();
+ tenant_metadata->set_tenant_name(tenant_name);
+ tenant_metadata->set_tenant_id(tenant_id);
+ tenant_metadata->set_tenant_key(tenant_key);
+ tenant_metadata->set_tenant_key_iv(tenant_key_iv);
+ tenant_metadata->set_tenant_key_version(tenant_key_version);
+ UpdateMetadataFormatAndStampUnlock(metadata.get());
+
+ return UpdateMetadata(metadata);
+}
+
+Status FsManager::RemoveTenantMetadata(const string& tenant_id) {
+ if (!is_tenant_exist(tenant_id)) {
+ return Status::NotFound(Substitute("$0: tenant not found", tenant_id));
+ }
+
+ unique_ptr<InstanceMetadataPB> metadata(new InstanceMetadataPB);
+ {
+ shared_lock<rw_spinlock> md_lock(metadata_rwlock_.get_lock());
+ metadata->CopyFrom(*metadata_);
+ }
+ for (int i = 0; i < metadata->tenants_size(); i++) {
+ if (metadata->tenants(i).tenant_id() == tenant_id) {
+ metadata->mutable_tenants()->DeleteSubrange(i, 1);
+ break;
+ }
+ }
+ UpdateMetadataFormatAndStampUnlock(metadata.get());
+
+ return UpdateMetadata(metadata);
+}
+
Status FsManager::CreateInitialFileSystemLayout(optional<string> uuid,
optional<string> tenant_name,
optional<string> tenant_id,
@@ -720,6 +853,7 @@ Status
FsManager::CreateInitialFileSystemLayout(optional<string> uuid,
//
// Files/directories created will NOT be synchronized to disk.
InstanceMetadataPB metadata;
+ string tid = tenant_id ? *tenant_id : fs::kDefaultTenantID;
RETURN_NOT_OK_PREPEND(CreateInstanceMetadata(std::move(uuid),
std::move(tenant_name),
std::move(tenant_id),
@@ -748,14 +882,7 @@ Status
FsManager::CreateInitialFileSystemLayout(optional<string> uuid,
// Create the directory manager.
//
// All files/directories created will be synchronized to disk.
- DataDirManagerOptions dm_opts;
- dm_opts.metric_entity = opts_.metric_entity;
- dm_opts.read_only = opts_.read_only;
- LOG_TIMING(INFO, "creating directory manager") {
- RETURN_NOT_OK_PREPEND(DataDirManager::CreateNew(
- GetEnv(), canonicalized_data_fs_roots_, dm_opts, &dd_manager_),
- "Unable to create directory manager");
- }
+ RETURN_NOT_OK(CreateNewDataDirManager(tid));
if (FLAGS_enable_data_block_fsync) {
// Files/directories created by the directory manager in the fs roots have
@@ -934,7 +1061,15 @@ const string& FsManager::server_key_version() const {
return CHECK_NOTNULL(metadata_.get())->server_key_version();
}
-const int32_t FsManager::tenants_count() const {
+bool FsManager::VertifyTenant(const std::string& tenant_id) const {
+ if (tenant_id == fs::kDefaultTenantID) {
+ return true;
+ }
+
+ return FLAGS_enable_multi_tenancy && is_tenant_exist(tenant_id);
+}
+
+int32 FsManager::tenants_count() const {
shared_lock<rw_spinlock> md_lock(metadata_rwlock_.get_lock());
return metadata_->tenants_size();
}
@@ -1007,16 +1142,93 @@ string FsManager::tenant_key_version(const string&
tenant_id) const {
return tenant ? tenant->tenant_key_version() : string("");
}
-Env* FsManager::GetEnv(const std::string& tenant_id) {
+Status FsManager::AddTenant(const string& tenant_name,
+ const string& tenant_id,
+ optional<string> tenant_key,
+ optional<string> tenant_key_iv,
+ optional<string> tenant_key_version) {
+ CHECK(FLAGS_enable_multi_tenancy);
+ if (is_tenant_exist(tenant_id)) {
+ return Status::AlreadyPresent(Substitute("Tenant $0 already exists.",
tenant_id));
+ }
+
+ if ((!tenant_key || !tenant_key_iv || !tenant_key_version) && key_provider_)
{
+ // Generate tenant key info if missing something of tenant.
+ RETURN_NOT_OK_PREPEND(key_provider_->GenerateTenantKey(tenant_id,
+ &(*tenant_key),
+ &(*tenant_key_iv),
+
&(*tenant_key_version)),
+ Substitute("Failed to generate encrypted tenant key
for tenant: $0.",
+ tenant_id));
+ }
+
+ // Update the metadata.
+ RETURN_NOT_OK_PREPEND(AddTenantMetadata(tenant_name,
+ tenant_id,
+ *tenant_key,
+ *tenant_key_iv,
+ *tenant_key_version),
+ Substitute("Fail to update metadata for add tenant:
$0.", tenant_id));
+
+ // Make sure env is available for create dd manager.
+ if (!AddEnv(tenant_id)) {
+ return Status::Corruption(Substitute("Fail to add env for tenant with id:
$0.", tenant_id));
+ }
+ RETURN_NOT_OK_PREPEND(SetEncryptionKey(tenant_id),
+ Substitute("Unable to set encryption key for tenant:
$0.", tenant_id));
+
+ // Create new dd manager and add the new dd manager to the dd manager map.
+ //
+ // TODO(kedeng):
+ // The new tenant should have its own dd manager instead of sharing the
default tenant's
+ // dd manager. This needs to be implemented along with having different
storage paths for
+ // different tenants.
+ RETURN_NOT_OK_PREPEND(OpenDataDirManager(tenant_id),
+ Substitute("Unable to create and open data dir manager
for tenant: $0.",
+ tenant_id));
+
+ // Create new block manager and add the new block manager to the block
manager map.
+ RETURN_NOT_OK_PREPEND(InitAndOpenBlockManager(nullptr,
+ nullptr,
+ nullptr,
+ tenant_id),
+ Substitute("Unable to open block manager for tenant:
$0.", tenant_id));
+
+ return Status::OK();
+}
+
+Status FsManager::RemoveTenant(const string& tenant_id) {
+ if (!VertifyTenant(tenant_id)) {
+ return Status::NotSupported(
+ Substitute("Not support for removing tenant for id: $0.", tenant_id));
+ }
+
+ if (tenant_id == fs::kDefaultTenantID) {
+ return Status::NotSupported("Remove default tenant is not allowed.");
+ }
+
+ return RemoveTenantMetadata(tenant_id);
+}
+
+vector<string> FsManager::GetAllTenants() const {
+ vector<string> tenant_ids;
+ shared_lock<rw_spinlock> md_lock(metadata_rwlock_.get_lock());
+ for (const auto& tdata : metadata_->tenants()) {
+ tenant_ids.push_back(tdata.tenant_id());
+ }
+
+ return tenant_ids;
+}
+
+Env* FsManager::AddEnv(const std::string& tenant_id) {
if (tenant_id == fs::kDefaultTenantID) {
return env_;
}
std::lock_guard<LockType> lock(env_lock_);
auto env = FindPtrOrNull(env_map_, tenant_id);
- if (!env && !FLAGS_enable_multi_tenancy) {
- LOG(ERROR) << "'--enable_multi_tenancy' is disable for tenant: " <<
tenant_id;
- return nullptr;
+ if (env) {
+ return env.get();
}
// Create new env and add the new env to the env map.
@@ -1026,9 +1238,36 @@ Env* FsManager::GetEnv(const std::string& tenant_id) {
return new_env.get();
}
-vector<string> FsManager::GetDataRootDirs() const {
+Env* FsManager::GetEnv(const std::string& tenant_id) const {
+ if (tenant_id == fs::kDefaultTenantID) {
+ return env_;
+ }
+
+ std::lock_guard<LockType> lock(env_lock_);
+ auto env = FindPtrOrNull(env_map_, tenant_id);
+ if (env) {
+ return env.get();
+ }
+
+ LOG(ERROR) << "'The --enable_multi_tenancy' is " <<
FLAGS_enable_multi_tenancy
+ << " for tenant: " << tenant_id << ", and we fail to search the
env.";
+ return nullptr;
+}
+
+scoped_refptr<DataDirManager> FsManager::dd_manager(const string& tenant_id)
const {
+ std::lock_guard<LockType> lock(ddm_lock_);
+ scoped_refptr<DataDirManager> dd_manager(FindPtrOrNull(dd_manager_map_,
tenant_id));
+ return dd_manager;
+}
+
+vector<string> FsManager::GetDataRootDirs(const string& tenant_id) const {
+ if (!VertifyTenant(tenant_id)) {
+ LOG(ERROR) << "Unable to get data root dirs for non existing tenant: "
+ << tenant_id << ", exit.";
+ return {};
+ }
// Get the data subdirectory for each data root.
- return dd_manager_->GetDirs();
+ return dd_manager(tenant_id)->GetDirs();
}
string FsManager::GetTabletMetadataDir() const {
@@ -1123,13 +1362,30 @@ void FsManager::CheckAndFixPermissions() {
}
}
+scoped_refptr<BlockManager> FsManager::block_manager(const std::string&
tenant_id) const {
+ if (!VertifyTenant(tenant_id)) {
+ LOG(ERROR) << "Do AddTenant for " << tenant_id
+ << " first before calling 'block_manager()'.";
+ return nullptr;
+ }
+
+ auto block_manager = SearchBlockManager(tenant_id);
+ return block_manager;
+}
+
// ==========================================================================
// Dump/Debug utils
// ==========================================================================
-void FsManager::DumpFileSystemTree(ostream& out) {
+void FsManager::DumpFileSystemTree(ostream& out, const string& tenant_id) {
DCHECK(initted_);
+ if (!VertifyTenant(tenant_id)) {
+ LOG(ERROR) << "Unable to dump file system tree for non existing tenant: "
+ << tenant_id << ", exit.";
+ return;
+ }
+
for (const auto& root : canonicalized_all_fs_roots_) {
if (!root.status.ok()) {
continue;
@@ -1137,7 +1393,7 @@ void FsManager::DumpFileSystemTree(ostream& out) {
out << "File-System Root: " << root.path << std::endl;
vector<string> objects;
- Status s = GetEnv()->GetChildren(root.path, &objects);
+ Status s = GetEnv(tenant_id)->GetChildren(root.path, &objects);
if (!s.ok()) {
LOG(ERROR) << "Unable to list the fs-tree: " << s.ToString();
return;
@@ -1164,25 +1420,66 @@ void FsManager::DumpFileSystemTree(ostream& out, const
string& prefix,
}
}
+Status FsManager::SetEncryptionKeyUnlock(const string& tenant_id) {
+ // Set encryption key for tenant.
+ if (is_tenant_exist(tenant_id) && key_provider_) {
+ string tenant_key;
+
RETURN_NOT_OK(key_provider_->DecryptEncryptionKey(tenant_key_unlock(tenant_id),
+
tenant_key_iv_unlock(tenant_id),
+
tenant_key_version_unlock(tenant_id),
+ &tenant_key));
+ // 'tenant_key' is a hexadecimal string and SetEncryptionKey expects bits
+ // (hex / 2 = bytes * 8 = bits).
+ GetEnv(tenant_id)->SetEncryptionKey(reinterpret_cast<const uint8_t*>(
+ a2b_hex(tenant_key).c_str()),
+ tenant_key.length() * 4);
+ }
+ return Status::OK();
+}
+
+Status FsManager::SetEncryptionKey(const string& tenant_id) {
+ shared_lock<rw_spinlock> md_lock(metadata_rwlock_.get_lock());
+ return SetEncryptionKeyUnlock(tenant_id);
+}
+
// ==========================================================================
// Data read/write interfaces
// ==========================================================================
-Status FsManager::CreateNewBlock(const CreateBlockOptions& opts,
unique_ptr<WritableBlock>* block) {
+Status FsManager::CreateNewBlock(const CreateBlockOptions& opts,
+ unique_ptr<WritableBlock>* block,
+ const string& tenant_id) {
+ if (!VertifyTenant(tenant_id)) {
+ return Status::NotFound(Substitute("$0: tenant not found", tenant_id));
+ }
+
+ auto bm = block_manager(tenant_id);
CHECK(!opts_.read_only);
- DCHECK(block_manager_);
- return block_manager_->CreateBlock(opts, block);
+ DCHECK(bm);
+ return bm->CreateBlock(opts, block);
}
-Status FsManager::OpenBlock(const BlockId& block_id,
unique_ptr<ReadableBlock>* block) {
- DCHECK(block_manager_);
- return block_manager_->OpenBlock(block_id, block);
+Status FsManager::OpenBlock(const BlockId& block_id,
+ unique_ptr<ReadableBlock>* block,
+ const string& tenant_id) {
+ if (!VertifyTenant(tenant_id)) {
+ return Status::NotFound(Substitute("$0: tenant not found", tenant_id));
+ }
+
+ auto bm = block_manager(tenant_id);
+ DCHECK(bm);
+ return bm->OpenBlock(block_id, block);
}
-bool FsManager::BlockExists(const BlockId& block_id) const {
- DCHECK(block_manager_);
+bool FsManager::BlockExists(const BlockId& block_id, const string& tenant_id) {
+ if (!VertifyTenant(tenant_id)) {
+ return false;
+ }
+
+ auto bm = block_manager(tenant_id);
+ DCHECK(bm);
unique_ptr<ReadableBlock> block;
- return block_manager_->OpenBlock(block_id, &block).ok();
+ return bm->OpenBlock(block_id, &block).ok();
}
} // namespace kudu
diff --git a/src/kudu/fs/fs_manager.h b/src/kudu/fs/fs_manager.h
index 0761823a9..2b0135d03 100644
--- a/src/kudu/fs/fs_manager.h
+++ b/src/kudu/fs/fs_manager.h
@@ -22,6 +22,7 @@
#include <iosfwd>
#include <map>
#include <memory>
+#include <mutex>
#include <optional>
#include <string>
#include <vector>
@@ -30,12 +31,16 @@
#include <glog/logging.h>
#include <gtest/gtest_prod.h>
+#include "kudu/fs/block_manager.h"
+#include "kudu/fs/data_dirs.h"
#include "kudu/fs/dir_manager.h"
#include "kudu/fs/error_manager.h"
+#include "kudu/gutil/integral_types.h"
#include "kudu/gutil/macros.h"
+#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
-#include "kudu/util/locks.h" // for percpu_rwlock
#include "kudu/util/env.h"
+#include "kudu/util/locks.h" // for percpu_rwlock
#include "kudu/util/metrics.h"
#include "kudu/util/oid_generator.h"
#include "kudu/util/path_util.h"
@@ -60,17 +65,13 @@ class Timer;
namespace fs {
-class BlockManager;
-class DataDirManager;
class FsManagerTestBase_TestDuplicatePaths_Test;
class FsManagerTestBase_TestEIOWhileRunningUpdateDirsTool_Test;
class FsManagerTestBase_TestIsolatedMetadataDir_Test;
class FsManagerTestBase_TestMetadataDirInDataRoot_Test;
class FsManagerTestBase_TestMetadataDirInWALRoot_Test;
class FsManagerTestBase_TestOpenWithDuplicateInstanceFiles_Test;
-class ReadableBlock;
-class WritableBlock;
-struct CreateBlockOptions;
+class FsManagerTestBase_TestTenantAccountOperation_Test;
struct FsReport;
} // namespace fs
@@ -250,20 +251,30 @@ class FsManager {
// ==========================================================================
// Creates a new block based on the options specified in 'opts'.
+ // If the tenant id is not specified, we treat it as the default tenant.
//
// Block will be synced on close.
Status CreateNewBlock(const fs::CreateBlockOptions& opts,
- std::unique_ptr<fs::WritableBlock>* block);
+ std::unique_ptr<fs::WritableBlock>* block,
+ const std::string& tenant_id = fs::kDefaultTenantID);
+ // If the tenant id is not specified, we treat it as the default tenant.
Status OpenBlock(const BlockId& block_id,
- std::unique_ptr<fs::ReadableBlock>* block);
+ std::unique_ptr<fs::ReadableBlock>* block,
+ const std::string& tenant_id = fs::kDefaultTenantID);
- bool BlockExists(const BlockId& block_id) const;
+ // If the tenant id is not specified, we treat it as the default tenant.
+ bool BlockExists(const BlockId& block_id,
+ const std::string& tenant_id = fs::kDefaultTenantID);
// ==========================================================================
// on-disk path
// ==========================================================================
- std::vector<std::string> GetDataRootDirs() const;
+ // Get the data subdirectories for each data root, which belong to the tenant
+ // specified by the tenant id.
+ // If the tenant id is not specified, we treat it as the default tenant.
+ std::vector<std::string> GetDataRootDirs(
+ const std::string& tenant_id = fs::kDefaultTenantID) const;
std::string GetWalsRootDir() const {
DCHECK(initted_);
@@ -304,10 +315,10 @@ class FsManager {
// Get env to do read/write.
// Different tenant owns different env.
- // Create a new env for the tenant if search fail when
'--enable_multi_tenancy' enabled.
+ // Return nullptr if search fail when '--enable_multi_tenancy' enabled.
//
// If the tenant id is not specified, we treat it as the default tenant.
- Env* GetEnv(const std::string& tenant_id = fs::kDefaultTenantID);
+ Env* GetEnv(const std::string& tenant_id = fs::kDefaultTenantID) const;
bool read_only() const {
return opts_.read_only;
@@ -325,8 +336,29 @@ class FsManager {
// tenant helpers
// ==========================================================================
+ // A new tenant must have a tenant name and tenant ID. If the tenant key
information
+ // is missing and the key generation service is available, we can use the
relevant
+ // key generation service to generate the key information for it.
+ //
+ // The validation of tenant name and ID needs to be conducted on the master
side.
+ Status AddTenant(const std::string& tenant_name,
+ const std::string& tenant_id,
+ std::optional<std::string> tenant_key,
+ std::optional<std::string> tenant_key_iv,
+ std::optional<std::string> tenant_key_version);
+
+ // As the tenant ID is globally unique and cannot be changed, while the
tenant
+ // name can be changed, we use the tenant ID as the parameter to delete a
tenant.
+ //
+ // TODO(kedeng):
+ // All data owned by a tenant should be deleted when the tenant is
removed.
+ Status RemoveTenant(const std::string& tenant_id);
+
+ // Get all the tenant id including the default tenant.
+ std::vector<std::string> GetAllTenants() const;
+
// Use to get the total count of all the tenants.
- const int32_t tenants_count() const;
+ int32 tenants_count() const;
// Use to confirm whether there is tenants information in metadata.
bool is_tenants_exist() const;
@@ -374,25 +406,42 @@ class FsManager {
// ==========================================================================
// file-system helpers
// ==========================================================================
- bool Exists(const std::string& path) const {
- return env_->FileExists(path);
+ // Used to judge whether a certain path exists.
+ //
+ // If the tenant id is not specified, we treat it as the default tenant.
+ bool Exists(const std::string& path,
+ const std::string& tenant_id = fs::kDefaultTenantID) const {
+ return GetEnv(tenant_id)->FileExists(path);
}
- Status ListDir(const std::string& path, std::vector<std::string> *objects)
const {
- return env_->GetChildren(path, objects);
+ // Get the dir list belongs to the tenant.
+ //
+ // If the tenant id is not specified, we treat it as the default tenant.
+ Status ListDir(const std::string& path,
+ std::vector<std::string> *objects,
+ const std::string& tenant_id = fs::kDefaultTenantID) const {
+ return GetEnv(tenant_id)->GetChildren(path, objects);
}
- fs::DataDirManager* dd_manager() const {
- return dd_manager_.get();
- }
+ // Search the tenant's dd manager.
+ //
+ // If the tenant id is not specified, we treat it as the default tenant.
+ scoped_refptr<fs::DataDirManager> dd_manager(
+ const std::string& tenant_id = fs::kDefaultTenantID) const;
- fs::BlockManager* block_manager() {
- DCHECK(block_manager_);
- return block_manager_.get();
- }
+ // Get block manager by tenant id.
+ // If the tenant does not exist, add it to the metadata, create a new block
+ // manager and new dd manager for corresponding.
+ //
+ // If the tenant id is not specified, we treat it as the default tenant.
+ scoped_refptr<fs::BlockManager> block_manager(
+ const std::string& tenant_id = fs::kDefaultTenantID) const;
// Prints the file system trees under the file system roots.
- void DumpFileSystemTree(std::ostream& out);
+ //
+ // If the tenant id is not specified, we treat it as the default tenant.
+ void DumpFileSystemTree(std::ostream& out,
+ const std::string& tenant_id = fs::kDefaultTenantID);
bool meta_on_xfs() const {
return meta_on_xfs_;
@@ -405,6 +454,7 @@ class FsManager {
FRIEND_TEST(fs::FsManagerTestBase, TestMetadataDirInWALRoot);
FRIEND_TEST(fs::FsManagerTestBase, TestMetadataDirInDataRoot);
FRIEND_TEST(fs::FsManagerTestBase, TestOpenWithDuplicateInstanceFiles);
+ FRIEND_TEST(fs::FsManagerTestBase, TestTenantAccountOperation);
FRIEND_TEST(tserver::MiniTabletServerTest, TestFsLayoutEndToEnd);
friend class itest::MiniClusterFsInspector; // for access to directory names
friend Status tools::UpdateEncryptionKeyInfo(Env* env); // for update the
metadata
@@ -414,9 +464,19 @@ class FsManager {
Status Init();
// Select and create an instance of the appropriate block manager.
+ // Search for the block manager corresponding to the tenant first, if no one
exist,
+ // create a new one and then return it.
//
// Does not actually perform any on-disk operations.
- void InitBlockManager();
+ scoped_refptr<fs::BlockManager> InitBlockManager(
+ const std::string& tenant_id = fs::kDefaultTenantID);
+
+ // Search the tenant's block manager.
+ scoped_refptr<fs::BlockManager> SearchBlockManager(const std::string&
tenant_id) const {
+ std::lock_guard<LockType> lock(bm_lock_);
+ scoped_refptr<fs::BlockManager>
block_manager(FindPtrOrNull(block_manager_map_, tenant_id));
+ return block_manager;
+ }
// Creates filesystem roots from 'canonicalized_roots', writing new on-disk
// instances using 'metadata'.
@@ -444,9 +504,32 @@ class FsManager {
Status WriteInstanceMetadata(const InstanceMetadataPB& metadata,
const std::string& root);
+ // To support multi-tenant scenarios and non-encrypted scenarios, some
interfaces
+ // have added a tenant ID parameter.
+ // This interface is used to confirm the validity of the tenant ID. The
default
+ // tenant ID is valid in any scenario, while a non-default tenant ID is only
valid
+ // when multi-tenant features are enabled.
+ //
+ // If valid, it returns true, otherwise it returns false.
+ bool VertifyTenant(const std::string& tenant_id) const;
+
+ // We record tenant information in metadata and persist it to disk. When we
add a
+ // new tenant, the records on the disk need to be updated at first, and then
+ // update the metadata in memory if success.
+ //
+ // Called only in the encryption enable scenario when a new tenant appears.
+ Status AddTenantMetadata(const std::string& tenant_name,
+ const std::string& tenant_id,
+ const std::string& tenant_key,
+ const std::string& tenant_key_iv,
+ const std::string& tenant_key_version);
+
+ // Remove the tenant recorded in the memory and the disk.
+ Status RemoveTenantMetadata(const std::string& tenant_id);
+
// Update metadata after adding tenant or removing tenant.
Status UpdateMetadata(
- std::unique_ptr<InstanceMetadataPB> metadata);
+ std::unique_ptr<InstanceMetadataPB>& metadata);
// Search tenant for metadata by tenant id.
const InstanceMetadataPB_TenantMetadataPB* GetTenant(const std::string&
tenant_id) const;
@@ -479,6 +562,24 @@ class FsManager {
// file-system helpers
// ==========================================================================
+ // Create a new env for the tenant if search fail when
'--enable_multi_tenancy' enabled.
+ Env* AddEnv(const std::string& tenant_id);
+
+ // Set encryption key for the env of tenant.
+ Status SetEncryptionKey(const std::string& tenant_id);
+ // Except that the caller must hold md_lock_.
+ Status SetEncryptionKeyUnlock(const std::string& tenant_id);
+
+ // Create data dir manager for tenant and add it to dd manager map.
+ //
+ // If the tenant id is not specified, we treat it as the default tenant.
+ Status CreateNewDataDirManager(const std::string& tenant_id =
fs::kDefaultTenantID);
+
+ // Open data dir manager for tenant and add it to dd manager map.
+ //
+ // If the tenant id is not specified, we treat it as the default tenant.
+ Status OpenDataDirManager(const std::string& tenant_id =
fs::kDefaultTenantID);
+
// Prints the file system tree for the objects in 'objects' under the given
// 'path'. Prints lines with the given 'prefix'.
void DumpFileSystemTree(std::ostream& out,
@@ -486,6 +587,23 @@ class FsManager {
const std::string& path,
const std::vector<std::string>& objects);
+ // Init and open block manager for tenant and add it to block manager map.
+ //
+ // If the tenant id is not specified, we treat it as the default tenant.
+ Status InitAndOpenBlockManager(fs::FsReport* report = nullptr,
+ std::atomic<int>* containers_processed =
nullptr,
+ std::atomic<int>* containers_total = nullptr,
+ const std::string& tenant_id =
fs::kDefaultTenantID,
+ fs::BlockManager::MergeReport need_merage =
+
fs::BlockManager::MergeReport::NOT_REQUIRED);
+
+ // Add data dir manager to the 'dd_manager_map_' keyed by tenant_id.
+ // Return 'AlreadyPresent' if the tenant present.
+ //
+ // If the tenant id is not specified, we treat it as the default tenant.
+ Status AddDataDirManager(scoped_refptr<fs::DataDirManager> dd_manager,
+ const std::string& tenant_id =
fs::kDefaultTenantID);
+
// Deletes leftover temporary files in all "special" top-level directories
// (e.g. WAL root directory).
//
@@ -499,6 +617,9 @@ class FsManager {
// Returns true if 'fname' is a valid tablet ID.
bool IsValidTabletId(const std::string& fname);
+ // Different tenants should own different data storage paths.
+ CanonicalizedRootsList get_canonicalized_data_fs_roots(const std::string&
tenant_id) const;
+
static const char *kDataDirName;
static const char *kTabletMetadataDirName;
static const char *kWalDirName;
@@ -531,14 +652,24 @@ class FsManager {
CanonicalizedRootsList canonicalized_data_fs_roots_;
CanonicalizedRootsList canonicalized_all_fs_roots_;
- // Lock protecting the 'metadata_' below.
+ // Lock protecting the 'metadata_'.
mutable percpu_rwlock metadata_rwlock_;
// Belongs to the default tenant.
std::unique_ptr<InstanceMetadataPB> metadata_;
- std::unique_ptr<fs::FsErrorManager> error_manager_;
- std::unique_ptr<fs::DataDirManager> dd_manager_;
- std::unique_ptr<fs::BlockManager> block_manager_;
+ // Shared by all the block managers.
+ scoped_refptr<fs::FsErrorManager> error_manager_;
+ // Lock protecting 'dd_manager_map_' below.
+ mutable LockType ddm_lock_;
+ typedef scoped_refptr<fs::DataDirManager> ScopedDDManagerPtr;
+ typedef std::map<std::string, ScopedDDManagerPtr> DataDirManagerMap;
+ DataDirManagerMap dd_manager_map_;
+
+ // Lock protecting 'block_manager_map_'.
+ mutable LockType bm_lock_;
+ typedef scoped_refptr<fs::BlockManager> ScopedBlockManagerPtr;
+ typedef std::map<std::string, ScopedBlockManagerPtr> BlockManagerMap;
+ BlockManagerMap block_manager_map_;
std::unique_ptr<security::KeyProvider> key_provider_;
diff --git a/src/kudu/fs/log_block_manager-test.cc
b/src/kudu/fs/log_block_manager-test.cc
index 11d910f46..665d12d1c 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -133,23 +133,26 @@ class LogBlockManagerTest : public KuduTest, public
::testing::WithParamInterfac
// Use a small file cache (smaller than the number of containers).
//
// Not strictly necessary except for
TestDeleteFromContainerAfterMetadataCompaction.
- file_cache_("test_cache", env_, 50, scoped_refptr<MetricEntity>()),
- bm_(CreateBlockManager(scoped_refptr<MetricEntity>())) {
+ file_cache_("test_cache", env_, 50, scoped_refptr<MetricEntity>()) {
CHECK_OK(file_cache_.Init());
+ error_manager_ = make_scoped_refptr(new FsErrorManager());
+ bm_ = CreateBlockManager(scoped_refptr<MetricEntity>());
}
void SetUp() override {
// Pass in a report to prevent the block manager from logging
unnecessarily.
FsReport report;
- ASSERT_OK(bm_->Open(&report, nullptr, nullptr));
+ ASSERT_OK(bm_->Open(&report, BlockManager::MergeReport::NOT_REQUIRED,
nullptr, nullptr));
ASSERT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
ASSERT_OK(dd_manager_->GetDataDirGroupPB(test_tablet_name_,
&test_group_pb_));
}
protected:
- LogBlockManager* CreateBlockManager(const scoped_refptr<MetricEntity>&
metric_entity,
- vector<string> test_data_dirs = {}) {
+ scoped_refptr<LogBlockManager> CreateBlockManager(
+ const scoped_refptr<MetricEntity>& metric_entity,
+ vector<string> test_data_dirs = {}) {
PrepareDataDirs(&test_data_dirs);
+
if (!dd_manager_) {
// Ensure the directory manager is initialized.
CHECK_OK(DataDirManager::CreateNewForTests(env_, test_data_dirs,
@@ -159,8 +162,8 @@ class LogBlockManagerTest : public KuduTest, public
::testing::WithParamInterfac
BlockManagerOptions opts;
opts.metric_entity = metric_entity;
CHECK_EQ(FLAGS_block_manager, "log");
- return new LogBlockManagerNativeMeta(
- env_, dd_manager_.get(), &error_manager_, &file_cache_,
std::move(opts));
+ return make_scoped_refptr(new LogBlockManagerNativeMeta(env_,
dd_manager_.get(), error_manager_,
+ &file_cache_, std::move(opts),
fs::kDefaultTenantID));
}
Status ReopenBlockManager(const scoped_refptr<MetricEntity>& metric_entity =
nullptr,
@@ -186,8 +189,8 @@ class LogBlockManagerTest : public KuduTest, public
::testing::WithParamInterfac
RETURN_NOT_OK(dd_manager_->LoadDataDirGroupFromPB(test_tablet_name_,
test_group_pb_));
}
- bm_.reset(CreateBlockManager(metric_entity, test_data_dirs));
- RETURN_NOT_OK(bm_->Open(report, nullptr, nullptr));
+ bm_ = CreateBlockManager(metric_entity, test_data_dirs);
+ RETURN_NOT_OK(bm_->Open(report, BlockManager::MergeReport::NOT_REQUIRED,
nullptr, nullptr));
return Status::OK();
}
@@ -247,10 +250,10 @@ class LogBlockManagerTest : public KuduTest, public
::testing::WithParamInterfac
string test_tablet_name_;
CreateBlockOptions test_block_opts_;
- unique_ptr<DataDirManager> dd_manager_;
- FsErrorManager error_manager_;
+ scoped_refptr<DataDirManager> dd_manager_;
+ scoped_refptr<FsErrorManager> error_manager_;
FileCache file_cache_;
- unique_ptr<LogBlockManager> bm_;
+ scoped_refptr<LogBlockManager> bm_;
private:
enum GetMode {
@@ -1908,11 +1911,12 @@ TEST_P(LogBlockManagerTest,
TestOpenWithFailedDirectories) {
DataDirManagerOptions(), &dd_manager_));
// Wire in a callback to fail data directories.
- error_manager_.SetErrorNotificationCb(
- ErrorHandlerType::DISK_ERROR, [this](const string& uuid) {
+ error_manager_->SetErrorNotificationCb(
+ ErrorHandlerType::DISK_ERROR, [this](const string& uuid,
+ const string& /* tenant_id */) {
this->dd_manager_->MarkDirFailedByUuid(uuid);
});
- bm_.reset(CreateBlockManager(nullptr));
+ bm_ = CreateBlockManager(nullptr);
// Fail one of the directories, chosen randomly.
FLAGS_crash_on_eio = false;
@@ -1922,7 +1926,7 @@ TEST_P(LogBlockManagerTest,
TestOpenWithFailedDirectories) {
// Check the report, ensuring the correct directory has failed.
FsReport report;
- ASSERT_OK(bm_->Open(&report, nullptr, nullptr));
+ ASSERT_OK(bm_->Open(&report, BlockManager::MergeReport::NOT_REQUIRED,
nullptr, nullptr));
ASSERT_EQ(kNumDirs - 1, report.data_dirs.size());
for (const string& data_dir : report.data_dirs) {
ASSERT_NE(data_dir, test_dirs[failed_idx]);
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index cc3ec7427..87acadc30 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -578,6 +578,8 @@ class LogBlockContainer: public
RefCountedThreadSafe<LogBlockContainer> {
return read_only_status_;
}
+ const std::string tenant_id() const { return block_manager_->tenant_id(); }
+
// Simple accessors.
LogBlockManager* block_manager() const { return block_manager_; }
const string& id() const { return id_; }
@@ -883,7 +885,7 @@ class LogBlockContainerNativeMeta final : public
LogBlockContainer {
#define CONTAINER_DISK_FAILURE(status_expr, msg) do { \
Status s_ = (status_expr); \
HANDLE_DISK_FAILURE(s_,
block_manager_->error_manager_->RunErrorNotificationCb( \
- ErrorHandlerType::DISK_ERROR, data_dir_)); \
+ ErrorHandlerType::DISK_ERROR, data_dir_, tenant_id())); \
WARN_NOT_OK(s_, msg); \
} while (0)
@@ -961,7 +963,8 @@ LogBlockContainerNativeMeta::~LogBlockContainerNativeMeta()
{
void LogBlockContainer::HandleError(const Status& s) const {
HANDLE_DISK_FAILURE(s,
block_manager()->error_manager()->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
- data_dir_));
+ data_dir_,
+ tenant_id()));
}
void LogBlockContainerNativeMeta::CompactMetadata() {
@@ -1029,7 +1032,8 @@ void LogBlockContainerNativeMeta::CompactMetadata() {
#define RETURN_NOT_OK_CONTAINER_DISK_FAILURE(status_expr) do { \
RETURN_NOT_OK_HANDLE_DISK_FAILURE((status_expr), \
-
block_manager->error_manager()->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
dir)); \
+ block_manager->error_manager()->RunErrorNotificationCb( \
+ ErrorHandlerType::DISK_ERROR, dir, block_manager->tenant_id())); \
} while (0)
Status LogBlockContainerNativeMeta::Create(LogBlockManager* block_manager,
@@ -1104,10 +1108,11 @@ Status
LogBlockContainerNativeMeta::Create(LogBlockManager* block_manager,
}
// Prefer metadata status (arbitrarily).
- FsErrorManager* em = block_manager->error_manager();
- HANDLE_DISK_FAILURE(metadata_status,
- em->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
dir));
- HANDLE_DISK_FAILURE(data_status,
em->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, dir));
+ auto em = block_manager->error_manager();
+ HANDLE_DISK_FAILURE(metadata_status, em->RunErrorNotificationCb(
+ ErrorHandlerType::DISK_ERROR, dir, block_manager->tenant_id()));
+ HANDLE_DISK_FAILURE(data_status, em->RunErrorNotificationCb(
+ ErrorHandlerType::DISK_ERROR, dir, block_manager->tenant_id()));
return !metadata_status.ok() ? metadata_status : data_status;
}
@@ -2339,20 +2344,22 @@ const map<int64_t, int64_t>
LogBlockManager::kPerFsBlockSizeBlockLimits({
{ 4096, 2721 }});
LogBlockManager::LogBlockManager(Env* env,
- DataDirManager* dd_manager,
- FsErrorManager* error_manager,
+ scoped_refptr<DataDirManager> dd_manager,
+ scoped_refptr<FsErrorManager> error_manager,
FileCache* file_cache,
- BlockManagerOptions opts)
+ BlockManagerOptions opts,
+ string tenant_id)
: env_(DCHECK_NOTNULL(env)),
- dd_manager_(DCHECK_NOTNULL(dd_manager)),
- error_manager_(DCHECK_NOTNULL(error_manager)),
+ dd_manager_(DCHECK_NOTNULL(std::move(dd_manager))),
+ error_manager_(DCHECK_NOTNULL(std::move(error_manager))),
opts_(std::move(opts)),
mem_tracker_(MemTracker::CreateTracker(-1,
"log_block_manager",
opts_.parent_mem_tracker)),
file_cache_(file_cache),
buggy_el6_kernel_(IsBuggyEl6Kernel(env->GetKernelRelease())),
- next_block_id_(1) {
+ next_block_id_(1),
+ tenant_id_(std::move(tenant_id)) {
managed_block_shards_.resize(kBlockMapChunk);
for (auto& mb : managed_block_shards_) {
mb.lock = unique_ptr<simple_spinlock>(new simple_spinlock());
@@ -2432,7 +2439,8 @@ Status LogBlockManagerNativeMeta::OpenContainer(Dir* dir,
return LogBlockContainerNativeMeta::Open(this, dir, report, id, container);
}
-Status LogBlockManager::Open(FsReport* report, std::atomic<int>*
containers_processed,
+Status LogBlockManager::Open(FsReport* report, MergeReport need_merage,
+ std::atomic<int>* containers_processed,
std::atomic<int>* containers_total) {
// Establish (and log) block limits for each data directory using kernel,
// filesystem, and gflags information.
@@ -2576,7 +2584,11 @@ Status LogBlockManager::Open(FsReport* report,
std::atomic<int>* containers_proc
// Either return or log the report.
if (report) {
- *report = std::move(merged_report);
+ if (need_merage == MergeReport::REQUIRED) {
+ report->MergeFrom(merged_report);
+ } else {
+ *report = std::move(merged_report);
+ }
} else {
RETURN_NOT_OK(merged_report.LogAndCheckForFatalErrors());
}
@@ -2696,7 +2708,9 @@ Status LogBlockManager::GetOrCreateContainer(const
CreateBlockOptions& opts,
LogBlockContainerRefPtr*
container) {
Dir* dir;
RETURN_NOT_OK_EVAL(dd_manager_->GetDirAddIfNecessary(opts, &dir),
-
error_manager_->RunErrorNotificationCb(ErrorHandlerType::NO_AVAILABLE_DISKS,
opts.tablet_id));
+
error_manager_->RunErrorNotificationCb(ErrorHandlerType::NO_AVAILABLE_DISKS,
+ opts.tablet_id,
+ tenant_id()));
{
std::lock_guard<simple_spinlock> l(lock_);
@@ -2715,7 +2729,8 @@ Status LogBlockManager::GetOrCreateContainer(const
CreateBlockOptions& opts,
// We could create a container in a different directory, but there's
// currently no point in doing so. On disk failure, the tablet specified by
// 'opts' will be shut down, so the returned container would not be used.
- HANDLE_DISK_FAILURE(s,
error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, dir));
+ HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(
+ ErrorHandlerType::DISK_ERROR, dir, tenant_id()));
RETURN_NOT_OK_PREPEND(s, "Could not create new log block container at " +
dir->dir());
{
std::lock_guard<simple_spinlock> l(lock_);
@@ -2927,7 +2942,9 @@ Status LogBlockManager::RemoveLogBlock(const BlockId&
block_id,
LogBlockContainer* container = it->second->container();
HANDLE_DISK_FAILURE(container->read_only_status(),
- error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
container->data_dir()));
+ error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
+ container->data_dir(),
+ tenant_id()));
// Return early if deleting a block in a failed directory.
set<int> failed_dirs = dd_manager_->GetFailedDirs();
@@ -2958,7 +2975,7 @@ void LogBlockManager::OpenDataDir(
Status s = env_->GetChildren(dir->dir(), &children);
if (!s.ok()) {
HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(
- ErrorHandlerType::DISK_ERROR, dir));
+ ErrorHandlerType::DISK_ERROR, dir, tenant_id()));
*result_status = s.CloneAndPrepend(Substitute(
"Could not list children of $0", dir->dir()));
return;
@@ -3114,7 +3131,7 @@ void LogBlockManager::LoadContainer(Dir* dir,
s = env_->GetFileSizeOnDisk(data_filename, &reported_size);
if (!s.ok()) {
HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(
- ErrorHandlerType::DISK_ERROR, dir));
+ ErrorHandlerType::DISK_ERROR, dir, tenant_id()));
result->status = s.CloneAndPrepend(Substitute(
"Could not get on-disk file size of container $0",
container->ToString()));
return;
@@ -3183,13 +3200,13 @@ void LogBlockManager::RepairTask(Dir* dir,
internal::LogBlockContainerLoadResult
Status s_ = (status_expr); \
s_ = s_.CloneAndPrepend(msg); \
RETURN_NOT_OK_HANDLE_DISK_FAILURE(s_, \
- error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
dir)); \
+ error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
dir, tenant_id())); \
} while (0)
#define WARN_NOT_OK_LBM_DISK_FAILURE(status_expr, msg) do { \
Status s_ = (status_expr); \
HANDLE_DISK_FAILURE(s_, error_manager_->RunErrorNotificationCb( \
- ErrorHandlerType::DISK_ERROR, dir)); \
+ ErrorHandlerType::DISK_ERROR, dir, tenant_id())); \
WARN_NOT_OK(s_, msg); \
} while (0)
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index 830d1798f..0cc8d4d25 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -34,6 +34,8 @@
#include "kudu/fs/block_id.h"
#include "kudu/fs/block_manager.h"
+#include "kudu/fs/data_dirs.h"
+#include "kudu/fs/error_manager.h"
#include "kudu/fs/fs.pb.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
@@ -49,9 +51,7 @@ class Env;
class FileCache;
namespace fs {
-class DataDirManager;
class Dir;
-class FsErrorManager;
struct FsReport;
namespace internal {
@@ -181,7 +181,8 @@ class LogBlockManager : public BlockManager {
~LogBlockManager() override;
- Status Open(FsReport* report, std::atomic<int>* containers_processed,
+ Status Open(FsReport* report, MergeReport need_merage,
+ std::atomic<int>* containers_processed,
std::atomic<int>* containers_total) override;
Status CreateBlock(const CreateBlockOptions& opts,
@@ -198,16 +199,19 @@ class LogBlockManager : public BlockManager {
void NotifyBlockId(BlockId block_id) override;
- FsErrorManager* error_manager() override { return error_manager_; }
+ scoped_refptr<FsErrorManager> error_manager() override { return
error_manager_; }
+
+ std::string tenant_id() const override { return tenant_id_; }
protected:
// Note: all objects passed as pointers should remain alive for the lifetime
// of the block manager.
LogBlockManager(Env* env,
- DataDirManager* dd_manager,
- FsErrorManager* error_manager,
+ scoped_refptr<DataDirManager> dd_manager,
+ scoped_refptr<FsErrorManager> error_manager,
FileCache* file_cache,
- BlockManagerOptions opts);
+ BlockManagerOptions opts,
+ std::string tenant_id);
FRIEND_TEST(LogBlockManagerTest, TestAbortBlock);
FRIEND_TEST(LogBlockManagerTest, TestCloseFinalizedBlock);
@@ -441,10 +445,10 @@ class LogBlockManager : public BlockManager {
// Manages and owns the data directories in which the block manager will
// place its blocks.
- DataDirManager* dd_manager_;
+ scoped_refptr<DataDirManager> dd_manager_;
// Manages callbacks used to handle disk failure.
- FsErrorManager* error_manager_;
+ scoped_refptr<FsErrorManager> error_manager_;
// The options that the LogBlockManager was created with.
const BlockManagerOptions opts_;
@@ -512,6 +516,9 @@ class LogBlockManager : public BlockManager {
// May be null if instantiated without metrics.
std::unique_ptr<internal::LogBlockManagerMetrics> metrics_;
+ // Which tenant this log block manager belongs to.
+ std::string tenant_id_;
+
DISALLOW_COPY_AND_ASSIGN(LogBlockManager);
};
@@ -529,11 +536,13 @@ class LogBlockManager : public BlockManager {
class LogBlockManagerNativeMeta : public LogBlockManager {
public:
LogBlockManagerNativeMeta(Env* env,
- DataDirManager* dd_manager,
- FsErrorManager* error_manager,
+ scoped_refptr<DataDirManager> dd_manager,
+ scoped_refptr<FsErrorManager> error_manager,
FileCache* file_cache,
- BlockManagerOptions opts)
- : LogBlockManager(env, dd_manager, error_manager, file_cache,
std::move(opts)) {
+ BlockManagerOptions opts,
+ std::string tenant_id)
+ : LogBlockManager(env, std::move(dd_manager), std::move(error_manager),
+ file_cache, std::move(opts), std::move(tenant_id)) {
}
private:
diff --git a/src/kudu/master/master.cc b/src/kudu/master/master.cc
index 07925e727..82edad2c0 100644
--- a/src/kudu/master/master.cc
+++ b/src/kudu/master/master.cc
@@ -198,15 +198,17 @@ GROUP_FLAG_VALIDATOR(multi_master_pp,
&ValidateMultiMasterProxiedRpcFlags);
constexpr const char* kReplaceMasterMessage =
"this master may return incorrect results and should be replaced";
-void CrashMasterOnDiskError(const string& uuid) {
+void CrashMasterOnDiskError(const string& uuid, const string& /* tenant_id */)
{
LOG(FATAL) << Substitute("Disk error detected on data directory $0: $1",
uuid, kReplaceMasterMessage);
}
-void CrashMasterOnCFileCorruption(const string& tablet_id) {
+void CrashMasterOnCFileCorruption(const string& tablet_id,
+ const string& /* tenant_id */) {
LOG(FATAL) << Substitute("CFile corruption detected on system catalog $0:
$1",
tablet_id, kReplaceMasterMessage);
}
-void CrashMasterOnKudu2233Corruption(const string& tablet_id) {
+void CrashMasterOnKudu2233Corruption(const string& tablet_id,
+ const string& /* tenant_id */) {
LOG(FATAL) << Substitute("KUDU-2233 corruption detected on system catalog
$0: $1 ",
tablet_id, kReplaceMasterMessage);
}
diff --git a/src/kudu/tablet/compaction-test.cc
b/src/kudu/tablet/compaction-test.cc
index e23b3083d..b1856d83b 100644
--- a/src/kudu/tablet/compaction-test.cc
+++ b/src/kudu/tablet/compaction-test.cc
@@ -1393,7 +1393,7 @@ TEST_F(TestCompaction, TestEmptyFlushDoesntLeakBlocks) {
// Fetch the metric for the number of on-disk blocks, so we can later verify
// that we actually remove data.
fs::LogBlockManager* lbm = down_cast<fs::LogBlockManager*>(
- harness_->fs_manager()->block_manager());
+ harness_->fs_manager()->block_manager().get());
vector<BlockId> before_block_ids;
ASSERT_OK(lbm->GetAllBlockIds(&before_block_ids));
diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc
index b6a31e86c..07598976b 100644
--- a/src/kudu/tablet/compaction.cc
+++ b/src/kudu/tablet/compaction.cc
@@ -984,7 +984,7 @@ Mutation* MergeUndoHistories(Mutation* left, Mutation*
right) {
// If 'old_row' has previous versions, this transforms prior version in undos
// and adds them to 'new_undo_head'.
Status MergeDuplicatedRowHistory(const string& tablet_id,
- const FsErrorManager* error_manager,
+ const scoped_refptr<FsErrorManager>&
error_manager,
CompactionInputRow* old_row,
Mutation** new_undo_head,
Arena* arena) {
@@ -1343,7 +1343,7 @@ Status ApplyMutationsAndGenerateUndos(const MvccSnapshot&
snap,
}
Status FlushCompactionInput(const string& tablet_id,
- const FsErrorManager* error_manager,
+ const scoped_refptr<FsErrorManager>& error_manager,
CompactionInput* input,
const MvccSnapshot& snap,
const HistoryGcOpts& history_gc_opts,
diff --git a/src/kudu/tablet/compaction.h b/src/kudu/tablet/compaction.h
index 967fc70b9..c8e5e4f46 100644
--- a/src/kudu/tablet/compaction.h
+++ b/src/kudu/tablet/compaction.h
@@ -28,6 +28,7 @@
#include "kudu/common/rowblock.h"
#include "kudu/common/timestamp.h"
+#include "kudu/gutil/ref_counted.h"
#include "kudu/tablet/rowset.h"
#include "kudu/util/status.h"
@@ -233,7 +234,7 @@ Status ApplyMutationsAndGenerateUndos(const MvccSnapshot&
snap,
// After return of this function, this CompactionInput object is "used up" and
will
// no longer be useful.
Status FlushCompactionInput(const std::string& tablet_id,
- const fs::FsErrorManager* error_manager,
+ const scoped_refptr<fs::FsErrorManager>&
error_manager,
CompactionInput* input,
const MvccSnapshot& snap,
const HistoryGcOpts& history_gc_opts,
diff --git a/src/kudu/tablet/delta_compaction.cc
b/src/kudu/tablet/delta_compaction.cc
index 66516d4d9..8f6d27f56 100644
--- a/src/kudu/tablet/delta_compaction.cc
+++ b/src/kudu/tablet/delta_compaction.cc
@@ -21,6 +21,7 @@
#include <map>
#include <ostream>
#include <string>
+#include <type_traits>
#include <utility>
#include <vector>
@@ -38,6 +39,7 @@
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/casts.h"
#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/cfile_set.h"
@@ -50,9 +52,12 @@
#include "kudu/tablet/mutation.h"
#include "kudu/tablet/mvcc.h"
#include "kudu/tablet/rowset_metadata.h"
-#include "kudu/util/memory/arena.h"
#include "kudu/util/trace.h"
+namespace kudu {
+class Arena;
+}
+
using kudu::fs::BlockCreationTransaction;
using kudu::fs::BlockManager;
using kudu::fs::CreateBlockOptions;
@@ -231,7 +236,7 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const
IOContext* io_context) {
nrows += n;
}
- BlockManager* bm = fs_manager_->block_manager();
+ auto bm = fs_manager_->block_manager();
unique_ptr<BlockCreationTransaction> transaction =
bm->NewCreationTransaction();
RETURN_NOT_OK(base_data_writer_->FinishAndReleaseBlocks(transaction.get()));
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index d0f985f87..38a7f0f79 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -41,6 +41,7 @@
#include "kudu/fs/fs_manager.h"
#include "kudu/fs/io_context.h"
#include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
#include "kudu/tablet/cfile_set.h"
#include "kudu/tablet/compaction.h"
#include "kudu/tablet/delta_compaction.h"
@@ -242,7 +243,7 @@ Status DiskRowSetWriter::AppendBlock(const RowBlock &block,
int live_row_count)
Status DiskRowSetWriter::Finish() {
TRACE_EVENT0("tablet", "DiskRowSetWriter::Finish");
- BlockManager* bm = rowset_metadata_->fs_manager()->block_manager();
+ auto bm = rowset_metadata_->fs_manager()->block_manager();
unique_ptr<BlockCreationTransaction> transaction =
bm->NewCreationTransaction();
RETURN_NOT_OK(FinishAndReleaseBlocks(transaction.get()));
return transaction->CommitCreatedBlocks();
@@ -336,7 +337,7 @@ RollingDiskRowSetWriter::RollingDiskRowSetWriter(
can_roll_(false),
written_count_(0),
written_size_(0) {
- BlockManager* bm = tablet_metadata->fs_manager()->block_manager();
+ auto bm = tablet_metadata->fs_manager()->block_manager();
block_transaction_ = bm->NewCreationTransaction();
CHECK(schema.has_column_ids());
}
diff --git a/src/kudu/tablet/tablet_metadata.cc
b/src/kudu/tablet/tablet_metadata.cc
index 706415145..979958edb 100644
--- a/src/kudu/tablet/tablet_metadata.cc
+++ b/src/kudu/tablet/tablet_metadata.cc
@@ -569,7 +569,7 @@ void TabletMetadata::DeleteOrphanedBlocks(const
BlockIdContainer& blocks) {
return;
}
- BlockManager* bm = fs_manager()->block_manager();
+ auto bm = fs_manager()->block_manager();
shared_ptr<BlockDeletionTransaction> transaction =
bm->NewDeletionTransaction();
for (const BlockId& b : blocks) {
transaction->AddDeletedBlock(b);
diff --git a/src/kudu/tools/tool_action_fs.cc b/src/kudu/tools/tool_action_fs.cc
index bc500848e..848064ffc 100644
--- a/src/kudu/tools/tool_action_fs.cc
+++ b/src/kudu/tools/tool_action_fs.cc
@@ -208,7 +208,7 @@ Status UpdateEncryptionKeyInfo(Env* env) {
metadata->clear_server_key_version();
// Write the new metadata to disk.
- RETURN_NOT_OK(fs_manager.UpdateMetadata(move(metadata)));
+ RETURN_NOT_OK(fs_manager.UpdateMetadata(metadata));
return Status::OK();
}
diff --git a/src/kudu/tserver/tablet_copy_client-test.cc
b/src/kudu/tserver/tablet_copy_client-test.cc
index 186a5da78..e938a9ee4 100644
--- a/src/kudu/tserver/tablet_copy_client-test.cc
+++ b/src/kudu/tserver/tablet_copy_client-test.cc
@@ -223,8 +223,8 @@ Status TabletCopyClientTest::CompareFileContents(const
string& path1, const stri
RETURN_NOT_OK(env_util::OpenFileForRandom(opts, fs_manager_->GetEnv(),
path2, &file2));
uint64_t size1;
- uint64_t size2;
RETURN_NOT_OK(file1->Size(&size1));
+ uint64_t size2;
RETURN_NOT_OK(file2->Size(&size2));
size1 -= file1->GetEncryptionHeaderSize();
size2 -= file2->GetEncryptionHeaderSize();
@@ -547,7 +547,7 @@ TEST_P(TabletCopyClientBasicTest, TestDownloadAllBlocks) {
// stop the copy client and cause it to fail.
TEST_P(TabletCopyClientBasicTest, TestFailedDiskStopsClient) {
ASSERT_OK(StartCopy());
- DataDirManager* dd_manager = fs_manager_->dd_manager();
+ scoped_refptr<DataDirManager> dd_manager = fs_manager_->dd_manager();
// Repeatedly fetch files for the client.
Status s;
diff --git a/src/kudu/tserver/tablet_copy_client.cc
b/src/kudu/tserver/tablet_copy_client.cc
index 9ae5c841b..0edd765f4 100644
--- a/src/kudu/tserver/tablet_copy_client.cc
+++ b/src/kudu/tserver/tablet_copy_client.cc
@@ -214,7 +214,7 @@ TabletCopyClient::TabletCopyClient(
start_time_micros_(0),
rng_(GetRandomSeed32()),
dst_tablet_copy_metrics_(dst_tablet_copy_metrics) {
- BlockManager* bm = dst_fs_manager->block_manager();
+ auto bm = dst_fs_manager->block_manager();
transaction_ = bm->NewCreationTransaction();
if (dst_tablet_copy_metrics_) {
dst_tablet_copy_metrics_->open_client_sessions->Increment();
diff --git a/src/kudu/tserver/tablet_copy_service-test.cc
b/src/kudu/tserver/tablet_copy_service-test.cc
index f85bed979..0fa4fd89c 100644
--- a/src/kudu/tserver/tablet_copy_service-test.cc
+++ b/src/kudu/tserver/tablet_copy_service-test.cc
@@ -21,8 +21,8 @@
#include <memory>
#include <ostream>
#include <string>
-#include <type_traits>
#include <thread>
+#include <type_traits>
#include <vector>
#include <gflags/gflags_declare.h>
diff --git a/src/kudu/tserver/tablet_server-test.cc
b/src/kudu/tserver/tablet_server-test.cc
index 5a626096f..54bffc3cc 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -724,7 +724,7 @@ class TabletServerDiskSpaceTest : public
TabletServerTestBase,
// and there are additional directories available, directories are added to the
// group, and the new groups are persisted to disk.
TEST_P(TabletServerDiskSpaceTest, TestFullGroupAddsDir) {
- DataDirManager* dd_manager =
mini_server_->server()->fs_manager()->dd_manager();
+ scoped_refptr<DataDirManager> dd_manager =
mini_server_->server()->fs_manager()->dd_manager();
vector<string> dir_group;
ASSERT_OK(dd_manager->FindDataDirsByTabletId(kTabletId, &dir_group));
ASSERT_EQ(kNumDirs - 1, dir_group.size());
diff --git a/src/kudu/tserver/tablet_server.cc
b/src/kudu/tserver/tablet_server.cc
index 694a4710f..247d86069 100644
--- a/src/kudu/tserver/tablet_server.cc
+++ b/src/kudu/tserver/tablet_server.cc
@@ -134,15 +134,17 @@ Status TabletServer::Start() {
CHECK_EQ(kInitialized, state_);
fs_manager_->SetErrorNotificationCb(
- ErrorHandlerType::DISK_ERROR, [this](const string& uuid) {
- this->tablet_manager_->FailTabletsInDataDir(uuid);
+ ErrorHandlerType::DISK_ERROR, [this](const string& uuid, const string&
tenant_id) {
+ this->tablet_manager_->FailTabletsInDataDir(uuid, tenant_id);
});
fs_manager_->SetErrorNotificationCb(
- ErrorHandlerType::CFILE_CORRUPTION, [this](const string& uuid) {
+ ErrorHandlerType::CFILE_CORRUPTION, [this](const string& uuid,
+ const string& /* tenant_id
*/) {
this->tablet_manager_->FailTabletAndScheduleShutdown(uuid);
});
fs_manager_->SetErrorNotificationCb(
- ErrorHandlerType::KUDU_2233_CORRUPTION, [this](const string& uuid) {
+ ErrorHandlerType::KUDU_2233_CORRUPTION, [this](const string& uuid,
+ const string& /*
tenant_id */) {
this->tablet_manager_->FailTabletAndScheduleShutdown(uuid);
});
unique_ptr<ServiceIf> ts_service(new TabletServiceImpl(this));
diff --git a/src/kudu/tserver/ts_tablet_manager.cc
b/src/kudu/tserver/ts_tablet_manager.cc
index fa5c9c9bf..7ccc72efb 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -1918,8 +1918,9 @@ Status TSTabletManager::DeleteTabletData(
return Status::OK();
}
-void TSTabletManager::FailTabletsInDataDir(const string& uuid) {
- DataDirManager* dd_manager = fs_manager_->dd_manager();
+void TSTabletManager::FailTabletsInDataDir(const string& uuid,
+ const string& tenant_id) {
+ scoped_refptr<DataDirManager> dd_manager =
fs_manager_->dd_manager(tenant_id);
int uuid_idx;
CHECK(dd_manager->FindUuidIndexByUuid(uuid, &uuid_idx))
<< Substitute("No data directory found with UUID $0", uuid);
diff --git a/src/kudu/tserver/ts_tablet_manager.h
b/src/kudu/tserver/ts_tablet_manager.h
index 865441f7f..21e14f089 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -242,7 +242,7 @@ class TSTabletManager : public
tserver::TabletReplicaLookupIf {
void FailTabletAndScheduleShutdown(const std::string& tablet_id);
// Forces shutdown of the tablet replicas in the data dir corresponding to
'uuid'.
- void FailTabletsInDataDir(const std::string& uuid);
+ void FailTabletsInDataDir(const std::string& uuid, const std::string&
tenant_id);
// Refresh the cached counts of tablet states, if the cache is old enough,
// and return the count for tablet state 'st'.