This is an automated email from the ASF dual-hosted git repository. laiyingchun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 14e16f0d910fd5b50f675178067f4e9c770d9dc3 Author: kedeng <[email protected]> AuthorDate: Thu Jul 6 10:56:31 2023 +0800 [multi-tenancy] support different tenants use their own env The multi-tenancy feature is implemented based on the data static encryption feature, and the data static encryption can be enabled separately. In order to maximize compatibility between these two different features, different tenants in the multi-tenant scenario will use different envs, and the envs of different tenants will have their own encryption key initialization. This patch does not modify the existing usage logic and therefore has no new unit tests. The verification of the new feature will be added together with the APIs for adding and deleting tenants. Change-Id: I41f030b608e42e227dd9cdd2f6182a978ddbcd2b Reviewed-on: http://gerrit.cloudera.org:8080/20164 Reviewed-by: Yingchun Lai <[email protected]> Tested-by: Yingchun Lai <[email protected]> --- src/kudu/consensus/consensus_meta.cc | 12 ++-- src/kudu/consensus/log.cc | 20 +++--- src/kudu/consensus/log_reader.cc | 2 +- src/kudu/fs/fs_manager-test.cc | 20 +++--- src/kudu/fs/fs_manager.cc | 82 +++++++++++++++---------- src/kudu/fs/fs_manager.h | 18 +++++- src/kudu/integration-tests/ts_recovery-itest.cc | 8 +-- src/kudu/kserver/kserver.cc | 4 +- src/kudu/master/catalog_manager.cc | 2 +- src/kudu/tablet/tablet_bootstrap-test.cc | 6 +- src/kudu/tablet/tablet_bootstrap.cc | 12 ++-- src/kudu/tablet/tablet_metadata.cc | 10 +-- src/kudu/tools/tool_action_local_replica.cc | 2 +- src/kudu/tserver/tablet_copy_client-test.cc | 10 +-- src/kudu/tserver/tablet_copy_client.cc | 10 +-- src/kudu/tserver/tablet_copy_service-test.cc | 5 +- src/kudu/tserver/tablet_copy_source_session.cc | 2 +- src/kudu/util/env.h | 3 + src/kudu/util/env_posix.cc | 5 ++ 19 files changed, 141 insertions(+), 92 deletions(-) diff --git a/src/kudu/consensus/consensus_meta.cc b/src/kudu/consensus/consensus_meta.cc index 8f520fec2..596b28e74 100644 --- a/src/kudu/consensus/consensus_meta.cc +++ b/src/kudu/consensus/consensus_meta.cc @@ -295,7 +295,7 @@ Status ConsensusMetadata::Flush(FlushMode flush_mode) { string dir = fs_manager_->GetConsensusMetadataDir(); bool created_dir = false; RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing( - fs_manager_->env(), dir, &created_dir), + fs_manager_->GetEnv(), dir, &created_dir), "Unable to create consensus metadata root dir"); // fsync() parent dir if we had to create the dir. if (PREDICT_FALSE(created_dir)) { @@ -308,7 +308,7 @@ Status ConsensusMetadata::Flush(FlushMode flush_mode) { FLAGS_cmeta_force_fsync || (FLAGS_cmeta_fsync_override_on_xfs && fs_manager_->meta_on_xfs()); string meta_file_path = fs_manager_->GetConsensusMetadataPath(tablet_id_); RETURN_NOT_OK_PREPEND(pb_util::WritePBContainerToPath( - fs_manager_->env(), meta_file_path, pb_, + fs_manager_->GetEnv(), meta_file_path, pb_, flush_mode == OVERWRITE ? pb_util::OVERWRITE : pb_util::NO_OVERWRITE, // We use FLAGS_log_force_fsync_all here because the consensus metadata is // essentially an extension of the primary durability mechanism of the @@ -362,7 +362,7 @@ Status ConsensusMetadata::Create(FsManager* fs_manager, } else { // Sanity check: ensure that there is no cmeta file currently on disk. const string& path = fs_manager->GetConsensusMetadataPath(tablet_id); - if (fs_manager->env()->FileExists(path)) { + if (fs_manager->GetEnv()->FileExists(path)) { return Status::AlreadyPresent(Substitute("File $0 already exists", path)); } } @@ -375,7 +375,7 @@ Status ConsensusMetadata::Load(FsManager* fs_manager, const std::string& peer_uuid, scoped_refptr<ConsensusMetadata>* cmeta_out) { scoped_refptr<ConsensusMetadata> cmeta(new ConsensusMetadata(fs_manager, tablet_id, peer_uuid)); - RETURN_NOT_OK(pb_util::ReadPBContainerFromPath(fs_manager->env(), + RETURN_NOT_OK(pb_util::ReadPBContainerFromPath(fs_manager->GetEnv(), fs_manager->GetConsensusMetadataPath(tablet_id), &cmeta->pb_, pb_util::SENSITIVE)); @@ -388,7 +388,7 @@ Status ConsensusMetadata::Load(FsManager* fs_manager, Status ConsensusMetadata::DeleteOnDiskData(FsManager* fs_manager, const string& tablet_id) { string cmeta_path = fs_manager->GetConsensusMetadataPath(tablet_id); - RETURN_NOT_OK_PREPEND(fs_manager->env()->DeleteFile(cmeta_path), + RETURN_NOT_OK_PREPEND(fs_manager->GetEnv()->DeleteFile(cmeta_path), Substitute("Unable to delete consensus metadata file for tablet $0", tablet_id)); return Status::OK(); @@ -417,7 +417,7 @@ void ConsensusMetadata::UpdateRoleAndTermCache() { Status ConsensusMetadata::UpdateOnDiskSize() { string path = fs_manager_->GetConsensusMetadataPath(tablet_id_); uint64_t on_disk_size; - RETURN_NOT_OK(fs_manager_->env()->GetFileSize(path, &on_disk_size)); + RETURN_NOT_OK(fs_manager_->GetEnv()->GetFileSize(path, &on_disk_size)); on_disk_size_ = on_disk_size; return Status::OK(); } diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc index 1e497223a..062b496e2 100644 --- a/src/kudu/consensus/log.cc +++ b/src/kudu/consensus/log.cc @@ -26,6 +26,8 @@ #include <type_traits> #include <utility> +#include <boost/iterator/iterator_facade.hpp> +#include <boost/iterator/reverse_iterator.hpp> #include <boost/range/adaptor/reversed.hpp> #include <gflags/gflags.h> @@ -638,7 +640,7 @@ Status SegmentAllocator::AllocateNewSegment() { string path_tmpl = JoinPathSegments(ctx_->log_dir, tmp_suffix); VLOG_WITH_PREFIX(2) << "Creating temp. file for place holder segment, template: " << path_tmpl; unique_ptr<RWFile> segment_file; - Env* env = ctx_->fs_manager->env(); + Env* env = ctx_->fs_manager->GetEnv(); RWFileOptions opts; opts.is_sensitive = true; RETURN_NOT_OK_PREPEND(env->NewTempRWFile( @@ -671,7 +673,7 @@ Status SegmentAllocator::SwitchToAllocatedSegment( const auto& tablet_id = ctx_->tablet_id; string new_segment_path = ctx_->fs_manager->GetWalSegmentFileName( tablet_id, active_segment_sequence_number_); - Env* env = ctx_->fs_manager->env(); + Env* env = ctx_->fs_manager->GetEnv(); RETURN_NOT_OK_PREPEND(env->RenameFile(next_segment_path_, new_segment_path), "could not rename next WAL segment"); if (opts_->force_fsync_all) { @@ -761,7 +763,7 @@ Status Log::Open(LogOptions options, scoped_refptr<Log>* log) { string tablet_wal_path = fs_manager->GetTabletWalDir(tablet_id); - RETURN_NOT_OK(env_util::CreateDirIfMissing(fs_manager->env(), tablet_wal_path)); + RETURN_NOT_OK(env_util::CreateDirIfMissing(fs_manager->GetEnv(), tablet_wal_path)); LogContext ctx({ tablet_id, std::move(tablet_wal_path) }); ctx.metric_entity = metric_entity; @@ -789,7 +791,7 @@ Status Log::Init() { CHECK_EQ(kLogInitialized, log_state_); // Init the index. - log_index_.reset(new LogIndex(ctx_.fs_manager->env(), + log_index_.reset(new LogIndex(ctx_.fs_manager->GetEnv(), ctx_.file_cache, ctx_.log_dir)); @@ -1075,7 +1077,7 @@ Status Log::GC(RetentionIndexes retention_indexes, int32_t* num_gced) { // segments_to_delete goes out of scope. RETURN_NOT_OK(ctx_.file_cache->DeleteFile(segment->path())); } else { - RETURN_NOT_OK(ctx_.fs_manager->env()->DeleteFile(segment->path())); + RETURN_NOT_OK(ctx_.fs_manager->GetEnv()->DeleteFile(segment->path())); } (*num_gced)++; } @@ -1180,12 +1182,12 @@ Status Log::Close() { bool Log::HasOnDiskData(FsManager* fs_manager, const string& tablet_id) { const string wal_dir = fs_manager->GetTabletWalDir(tablet_id); - return fs_manager->env()->FileExists(wal_dir); + return fs_manager->GetEnv()->FileExists(wal_dir); } Status Log::DeleteOnDiskData(FsManager* fs_manager, const string& tablet_id) { string wal_dir = fs_manager->GetTabletWalDir(tablet_id); - Env* env = fs_manager->env(); + Env* env = fs_manager->GetEnv(); if (!env->FileExists(wal_dir)) { return Status::OK(); } @@ -1213,7 +1215,7 @@ Status Log::RemoveRecoveryDirIfExists(FsManager* fs_manager, const string& table string tmp_path = Substitute("$0-$1", recovery_path, GetCurrentTimeMicros()); VLOG(1) << kLogPrefix << "Renaming log recovery dir from " << recovery_path << " to " << tmp_path; - RETURN_NOT_OK_PREPEND(fs_manager->env()->RenameFile(recovery_path, tmp_path), + RETURN_NOT_OK_PREPEND(fs_manager->GetEnv()->RenameFile(recovery_path, tmp_path), Substitute("Could not rename old recovery dir from: $0 to: $1", recovery_path, tmp_path)); @@ -1225,7 +1227,7 @@ Status Log::RemoveRecoveryDirIfExists(FsManager* fs_manager, const string& table // We don't need to delete through the file cache; we're guaranteed that // the log has been closed (though this invariant isn't verifiable here // without additional plumbing). - RETURN_NOT_OK_PREPEND(fs_manager->env()->DeleteRecursively(tmp_path), + RETURN_NOT_OK_PREPEND(fs_manager->GetEnv()->DeleteRecursively(tmp_path), "Could not remove renamed recovery dir " + tmp_path); VLOG(1) << kLogPrefix << "Completed deletion of old log recovery files and directory " << tmp_path; diff --git a/src/kudu/consensus/log_reader.cc b/src/kudu/consensus/log_reader.cc index 40743e0bc..ea967ab12 100644 --- a/src/kudu/consensus/log_reader.cc +++ b/src/kudu/consensus/log_reader.cc @@ -103,7 +103,7 @@ Status LogReader::Open(FsManager* fs_manager, const scoped_refptr<MetricEntity>& metric_entity, FileCache* file_cache, std::shared_ptr<LogReader>* reader) { - return LogReader::Open(fs_manager->env(), fs_manager->GetTabletWalDir(tablet_id), + return LogReader::Open(fs_manager->GetEnv(), fs_manager->GetTabletWalDir(tablet_id), index, tablet_id, metric_entity, file_cache, reader); } diff --git a/src/kudu/fs/fs_manager-test.cc b/src/kudu/fs/fs_manager-test.cc index 9124f38ee..2c380bedb 100644 --- a/src/kudu/fs/fs_manager-test.cc +++ b/src/kudu/fs/fs_manager-test.cc @@ -638,26 +638,26 @@ TEST_P(FsManagerTestBase, TestTmpFilesCleanup) { shared_ptr<WritableFile> tmp_writer; string tmp_path = JoinPathSegments(fs_manager()->GetWalsRootDir(), "wal.kudutmp.file"); - ASSERT_OK(env_util::OpenFileForWrite(fs_manager()->env(), tmp_path, &tmp_writer)); + ASSERT_OK(env_util::OpenFileForWrite(fs_manager()->GetEnv(), tmp_path, &tmp_writer)); tmp_path = JoinPathSegments(fs_manager()->GetDataRootDirs()[0], "data1.kudutmp.file"); - ASSERT_OK(env_util::OpenFileForWrite(fs_manager()->env(), tmp_path, &tmp_writer)); + ASSERT_OK(env_util::OpenFileForWrite(fs_manager()->GetEnv(), tmp_path, &tmp_writer)); tmp_path = JoinPathSegments(fs_manager()->GetConsensusMetadataDir(), "12345.kudutmp.asdfg"); - ASSERT_OK(env_util::OpenFileForWrite(fs_manager()->env(), tmp_path, &tmp_writer)); + ASSERT_OK(env_util::OpenFileForWrite(fs_manager()->GetEnv(), tmp_path, &tmp_writer)); tmp_path = JoinPathSegments(fs_manager()->GetTabletMetadataDir(), "12345.kudutmp.asdfg"); - ASSERT_OK(env_util::OpenFileForWrite(fs_manager()->env(), tmp_path, &tmp_writer)); + ASSERT_OK(env_util::OpenFileForWrite(fs_manager()->GetEnv(), tmp_path, &tmp_writer)); // Not a misprint here: checking for just ".kudutmp" as well tmp_path = JoinPathSegments(fs_manager()->GetDataRootDirs()[1], "data2.kudutmp"); - ASSERT_OK(env_util::OpenFileForWrite(fs_manager()->env(), tmp_path, &tmp_writer)); + ASSERT_OK(env_util::OpenFileForWrite(fs_manager()->GetEnv(), tmp_path, &tmp_writer)); // Try with nested directory string nested_dir_path = JoinPathSegments(fs_manager()->GetDataRootDirs()[2], "data4"); - ASSERT_OK(env_util::CreateDirIfMissing(fs_manager()->env(), nested_dir_path)); + ASSERT_OK(env_util::CreateDirIfMissing(fs_manager()->GetEnv(), nested_dir_path)); tmp_path = JoinPathSegments(nested_dir_path, "data4.kudutmp.file"); - ASSERT_OK(env_util::OpenFileForWrite(fs_manager()->env(), tmp_path, &tmp_writer)); + ASSERT_OK(env_util::OpenFileForWrite(fs_manager()->GetEnv(), tmp_path, &tmp_writer)); // Add a loop using symlink string data3_link = JoinPathSegments(nested_dir_path, "data3-link"); @@ -670,7 +670,7 @@ TEST_P(FsManagerTestBase, TestTmpFilesCleanup) { lookup_dirs.emplace_back(fs_manager()->GetTabletMetadataDir()); int n_tmp_files = 0; - ASSERT_OK(CountTmpFiles(fs_manager()->env(), lookup_dirs, &n_tmp_files)); + ASSERT_OK(CountTmpFiles(fs_manager()->GetEnv(), lookup_dirs, &n_tmp_files)); ASSERT_EQ(6, n_tmp_files); // The FsManager should not delete any tmp files if it fails to acquire @@ -683,7 +683,7 @@ TEST_P(FsManagerTestBase, TestTmpFilesCleanup) { ReinitFsManagerWithPaths(wal_path, data_paths); Status s = fs_manager()->Open(); ASSERT_STR_MATCHES(s.ToString(), "Could not lock.*"); - ASSERT_OK(CountTmpFiles(fs_manager()->env(), lookup_dirs, &n_tmp_files)); + ASSERT_OK(CountTmpFiles(fs_manager()->GetEnv(), lookup_dirs, &n_tmp_files)); ASSERT_EQ(6, n_tmp_files); } @@ -692,7 +692,7 @@ TEST_P(FsManagerTestBase, TestTmpFilesCleanup) { ASSERT_OK(fs_manager()->Open()); n_tmp_files = 0; - ASSERT_OK(CountTmpFiles(fs_manager()->env(), lookup_dirs, &n_tmp_files)); + ASSERT_OK(CountTmpFiles(fs_manager()->GetEnv(), lookup_dirs, &n_tmp_files)); ASSERT_EQ(0, n_tmp_files); } diff --git a/src/kudu/fs/fs_manager.cc b/src/kudu/fs/fs_manager.cc index 7a8ef5983..4a43c74a2 100644 --- a/src/kudu/fs/fs_manager.cc +++ b/src/kudu/fs/fs_manager.cc @@ -183,6 +183,7 @@ using kudu::security::DefaultKeyProvider; using kudu::security::RangerKMSKeyProvider; using std::optional; using std::ostream; +using std::shared_ptr; using std::string; using std::unique_ptr; using std::unordered_map; @@ -294,7 +295,7 @@ Status FsManager::Init() { // Strip the basename when canonicalizing, as it may not exist. The // dirname, however, must exist. string canonicalized; - Status s = env_->Canonicalize(DirName(root), &canonicalized); + Status s = GetEnv()->Canonicalize(DirName(root), &canonicalized); if (PREDICT_FALSE(!s.ok())) { if (s.IsNotFound() || s.IsDiskFailure()) { // If the directory fails to canonicalize due to disk failure, store @@ -337,7 +338,7 @@ Status FsManager::Init() { // If there is already metadata in the first data root, use it. Otherwise, // use the WAL root. LOG(INFO) << "Metadata directory not provided"; - if (env_->FileExists(meta_dir_in_data_root)) { + if (GetEnv()->FileExists(meta_dir_in_data_root)) { canonicalized_metadata_fs_root_ = canonicalized_data_fs_roots_[0]; LOG(INFO) << "Using existing metadata directory in first data directory"; } else { @@ -381,10 +382,10 @@ void FsManager::InitBlockManager() { bm_opts.read_only = opts_.read_only; if (opts_.block_manager_type == "file") { block_manager_.reset(new FileBlockManager( - env_, dd_manager_.get(), error_manager_.get(), opts_.file_cache, std::move(bm_opts))); + GetEnv(), dd_manager_.get(), error_manager_.get(), opts_.file_cache, std::move(bm_opts))); } else if (opts_.block_manager_type == "log") { block_manager_.reset(new LogBlockManagerNativeMeta( - env_, dd_manager_.get(), error_manager_.get(), opts_.file_cache, std::move(bm_opts))); + GetEnv(), dd_manager_.get(), error_manager_.get(), opts_.file_cache, std::move(bm_opts))); } else { LOG(FATAL) << "Unknown block_manager_type: " << opts_.block_manager_type; } @@ -398,7 +399,7 @@ Status FsManager::PartialOpen(CanonicalizedRootsList* missing_roots) { continue; } unique_ptr<InstanceMetadataPB> pb(new InstanceMetadataPB); - Status s = pb_util::ReadPBContainerFromPath(env_, GetInstanceMetadataPath(root.path), + Status s = pb_util::ReadPBContainerFromPath(GetEnv(), GetInstanceMetadataPath(root.path), pb.get(), pb_util::NOT_SENSITIVE); if (PREDICT_FALSE(!s.ok())) { if (s.IsNotFound()) { @@ -432,7 +433,7 @@ Status FsManager::PartialOpen(CanonicalizedRootsList* missing_roots) { const auto bad_meta_fs_msg = Substitute("Could not determine file system of metadata directory $0", meta_root_path); - Status s = env_->IsOnXfsFilesystem(meta_root_path, &meta_on_xfs_); + Status s = GetEnv()->IsOnXfsFilesystem(meta_root_path, &meta_on_xfs_); if (FLAGS_cmeta_fsync_override_on_xfs) { // Err on the side of visibility if we expect a behavior change based on // the file system. @@ -465,7 +466,7 @@ Status FsManager::Open(FsReport* report, Timer* read_instance_metadata_files, GetConsensusMetadataDir() }; for (const auto& d : ancillary_dirs) { bool is_dir; - RETURN_NOT_OK_PREPEND(env_->IsDirectory(d, &is_dir), + RETURN_NOT_OK_PREPEND(GetEnv()->IsDirectory(d, &is_dir), Substitute("could not verify required directory $0", d)); if (!is_dir) { return Status::Corruption( @@ -479,12 +480,12 @@ Status FsManager::Open(FsReport* report, Timer* read_instance_metadata_files, auto deleter = MakeScopedCleanup([&]() { // Delete files first so that the directories will be empty when deleted. for (const auto& f : created_files) { - WARN_NOT_OK(env_->DeleteFile(f), "Could not delete file " + f); + WARN_NOT_OK(GetEnv()->DeleteFile(f), "Could not delete file " + f); } // Delete directories in reverse order since parent directories will have // been added before child directories. for (auto it = created_dirs.rbegin(); it != created_dirs.rend(); it++) { - WARN_NOT_OK(env_->DeleteDir(*it), "Could not delete dir " + *it); + WARN_NOT_OK(GetEnv()->DeleteDir(*it), "Could not delete dir " + *it); } }); @@ -540,9 +541,9 @@ Status FsManager::Open(FsReport* report, Timer* read_instance_metadata_files, if (!decrypted_key.empty()) { // 'decrypted_key' is a hexadecimal string and SetEncryptionKey expects bits // (hex / 2 = bytes * 8 = bits). - env_->SetEncryptionKey(reinterpret_cast<const uint8_t*>( - a2b_hex(decrypted_key).c_str()), - decrypted_key.length() * 4); + GetEnv()->SetEncryptionKey(reinterpret_cast<const uint8_t*>( + a2b_hex(decrypted_key).c_str()), + decrypted_key.length() * 4); } // Open the directory manager if it has not been opened already. @@ -553,7 +554,7 @@ Status FsManager::Open(FsReport* report, Timer* read_instance_metadata_files, dm_opts.dir_type = opts_.block_manager_type; dm_opts.update_instances = opts_.update_instances; LOG_TIMING(INFO, "opening directory manager") { - RETURN_NOT_OK(DataDirManager::OpenExisting(env_, + RETURN_NOT_OK(DataDirManager::OpenExisting(GetEnv(), canonicalized_data_fs_roots_, dm_opts, &dd_manager_)); } } @@ -602,7 +603,7 @@ Status FsManager::Open(FsReport* report, Timer* read_instance_metadata_files, if (FLAGS_enable_data_block_fsync) { // Files/directories created by the directory manager in the fs roots have // been synchronized, so now is a good time to sync the roots themselves. - WARN_NOT_OK(env_util::SyncAllParentDirs(env_, created_dirs, created_files), + WARN_NOT_OK(env_util::SyncAllParentDirs(GetEnv(), created_dirs, created_files), "could not sync newly created fs roots"); } @@ -637,11 +638,11 @@ Status FsManager::UpdateMetadata(unique_ptr<InstanceMetadataPB> metadata) { auto rollbacker = MakeScopedCleanup([&]() { // Delete new files first so that the backup files could do rollback. for (const auto& changed : changed_dirs) { - WARN_NOT_OK(env_->DeleteFile(changed.first), + WARN_NOT_OK(GetEnv()->DeleteFile(changed.first), Substitute("Could not delete file $0 for rollback.", changed.first)); VLOG(1) << "Delete file: " << changed.first << " for rollback."; - WARN_NOT_OK(env_->RenameFile(changed.second, changed.first), + WARN_NOT_OK(GetEnv()->RenameFile(changed.second, changed.first), Substitute("Could not rename file $0 for rollback.", changed.second)); VLOG(1) << "Rename file: " << changed.second << " to " << changed.first << " for rollback."; } @@ -654,7 +655,7 @@ Status FsManager::UpdateMetadata(unique_ptr<InstanceMetadataPB> metadata) { // Backup the metadata at first. const string old_path = GetInstanceMetadataPath(root.path); string tmp_path = Substitute("$0-$1", old_path, GetCurrentTimeMicros()); - WARN_NOT_OK(env_->RenameFile(old_path, tmp_path), + WARN_NOT_OK(GetEnv()->RenameFile(old_path, tmp_path), Substitute("Could not rename file $0, ", old_path)); changed_dirs[old_path] = tmp_path; // Write the instance metadata with latest data. @@ -671,7 +672,7 @@ Status FsManager::UpdateMetadata(unique_ptr<InstanceMetadataPB> metadata) { // If all op success, remove the backup data. for (const auto& changed : changed_dirs) { - WARN_NOT_OK(env_->DeleteFile(changed.second), Substitute("Could not delete file $0, ", + WARN_NOT_OK(GetEnv()->DeleteFile(changed.second), Substitute("Could not delete file $0, ", changed.second)); } @@ -706,12 +707,12 @@ Status FsManager::CreateInitialFileSystemLayout(optional<string> uuid, auto deleter = MakeScopedCleanup([&]() { // Delete files first so that the directories will be empty when deleted. for (const auto& f : created_files) { - WARN_NOT_OK(env_->DeleteFile(f), "Could not delete file " + f); + WARN_NOT_OK(GetEnv()->DeleteFile(f), "Could not delete file " + f); } // Delete directories in reverse order since parent directories will have // been added before child directories. for (auto it = created_dirs.rbegin(); it != created_dirs.rend(); it++) { - WARN_NOT_OK(env_->DeleteDir(*it), "Could not delete dir " + *it); + WARN_NOT_OK(GetEnv()->DeleteDir(*it), "Could not delete dir " + *it); } }); @@ -737,7 +738,7 @@ Status FsManager::CreateInitialFileSystemLayout(optional<string> uuid, GetConsensusMetadataDir() }; for (const string& dir : ancillary_dirs) { bool created; - RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(env_, dir, &created), + RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(GetEnv(), dir, &created), Substitute("Unable to create directory $0", dir)); if (created) { created_dirs.emplace_back(dir); @@ -752,14 +753,14 @@ Status FsManager::CreateInitialFileSystemLayout(optional<string> uuid, dm_opts.read_only = opts_.read_only; LOG_TIMING(INFO, "creating directory manager") { RETURN_NOT_OK_PREPEND(DataDirManager::CreateNew( - env_, canonicalized_data_fs_roots_, dm_opts, &dd_manager_), + GetEnv(), canonicalized_data_fs_roots_, dm_opts, &dd_manager_), "Unable to create directory manager"); } if (FLAGS_enable_data_block_fsync) { // Files/directories created by the directory manager in the fs roots have // been synchronized, so now is a good time to sync the roots themselves. - WARN_NOT_OK(env_util::SyncAllParentDirs(env_, created_dirs, created_files), + WARN_NOT_OK(env_util::SyncAllParentDirs(GetEnv(), created_dirs, created_files), "could not sync newly created fs roots"); } @@ -782,12 +783,12 @@ Status FsManager::CreateFileSystemRoots( return Status::IOError("cannot create FS layout; at least one directory " "failed to canonicalize", root.path); } - if (!env_->FileExists(root.path)) { + if (!GetEnv()->FileExists(root.path)) { // We'll create the directory below. continue; } bool is_empty; - RETURN_NOT_OK_PREPEND(env_util::IsDirectoryEmpty(env_, root.path, &is_empty), + RETURN_NOT_OK_PREPEND(env_util::IsDirectoryEmpty(GetEnv(), root.path, &is_empty), "unable to check if FSManager root is empty"); if (!is_empty) { non_empty_roots.emplace_back(root.path); @@ -807,7 +808,7 @@ Status FsManager::CreateFileSystemRoots( } string root_name = root.path; bool created; - RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(env_, root_name, &created), + RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(GetEnv(), root_name, &created), "unable to create FSManager root"); if (created) { created_dirs->emplace_back(root_name); @@ -907,7 +908,7 @@ Status FsManager::WriteInstanceMetadata(const InstanceMetadataPB& metadata, // The instance metadata is written effectively once per TS, so the // durability cost is negligible. - RETURN_NOT_OK(pb_util::WritePBContainerToPath(env_, path, + RETURN_NOT_OK(pb_util::WritePBContainerToPath(GetEnv(), path, metadata, pb_util::NO_OVERWRITE, pb_util::SYNC, @@ -1006,6 +1007,25 @@ string FsManager::tenant_key_version(const string& tenant_id) const { return tenant ? tenant->tenant_key_version() : string(""); } +Env* FsManager::GetEnv(const std::string& tenant_id) { + if (tenant_id == fs::kDefaultTenantID) { + return env_; + } + + std::lock_guard<LockType> lock(env_lock_); + auto env = FindPtrOrNull(env_map_, tenant_id); + if (!env && !FLAGS_enable_multi_tenancy) { + LOG(ERROR) << "'--enable_multi_tenancy' is disable for tenant: " << tenant_id; + return nullptr; + } + + // Create new env and add the new env to the env map. + LOG(INFO) << "Create new env for tenant: " << tenant_id; + shared_ptr<Env> new_env = Env::NewSharedEnv(); + env_map_[tenant_id] = new_env; + return new_env.get(); +} + vector<string> FsManager::GetDataRootDirs() const { // Get the data subdirectory for each data root. return dd_manager_->GetDirs(); @@ -1087,7 +1107,7 @@ void FsManager::CleanTmpFiles() { for (const auto& s : { GetWalsRootDir(), GetTabletMetadataDir(), GetConsensusMetadataDir() }) { - WARN_NOT_OK(env_util::DeleteTmpFilesRecursively(env_, s), + WARN_NOT_OK(env_util::DeleteTmpFilesRecursively(GetEnv(), s), Substitute("Error deleting tmp files in $0", s)); } } @@ -1097,7 +1117,7 @@ void FsManager::CheckAndFixPermissions() { if (!root.status.ok()) { continue; } - WARN_NOT_OK(env_->EnsureFileModeAdheresToUmask(root.path), + WARN_NOT_OK(GetEnv()->EnsureFileModeAdheresToUmask(root.path), Substitute("could not check and fix permissions for path: $0", root.path)); } @@ -1117,7 +1137,7 @@ void FsManager::DumpFileSystemTree(ostream& out) { out << "File-System Root: " << root.path << std::endl; vector<string> objects; - Status s = env_->GetChildren(root.path, &objects); + Status s = GetEnv()->GetChildren(root.path, &objects); if (!s.ok()) { LOG(ERROR) << "Unable to list the fs-tree: " << s.ToString(); return; @@ -1134,7 +1154,7 @@ void FsManager::DumpFileSystemTree(ostream& out, const string& prefix, vector<string> sub_objects; string sub_path = JoinPathSegments(path, name); - Status s = env_->GetChildren(sub_path, &sub_objects); + Status s = GetEnv()->GetChildren(sub_path, &sub_objects); if (s.ok()) { out << prefix << name << "/" << std::endl; DumpFileSystemTree(out, prefix + "---", sub_path, sub_objects); diff --git a/src/kudu/fs/fs_manager.h b/src/kudu/fs/fs_manager.h index 40dd40c95..0761823a9 100644 --- a/src/kudu/fs/fs_manager.h +++ b/src/kudu/fs/fs_manager.h @@ -20,6 +20,7 @@ #include <atomic> #include <cstdint> #include <iosfwd> +#include <map> #include <memory> #include <optional> #include <string> @@ -301,7 +302,12 @@ class FsManager { return JoinPathSegments(GetConsensusMetadataDir(), tablet_id); } - Env* env() { return env_; } + // Get env to do read/write. + // Different tenant owns different env. + // Create a new env for the tenant if search fail when '--enable_multi_tenancy' enabled. + // + // If the tenant id is not specified, we treat it as the default tenant. + Env* GetEnv(const std::string& tenant_id = fs::kDefaultTenantID); bool read_only() const { return opts_.read_only; @@ -499,8 +505,18 @@ class FsManager { static const char *kInstanceMetadataFileName; static const char *kConsensusMetadataDirName; + typedef rw_spinlock LockType; + + // Lock protecting the env_map_ below. + mutable LockType env_lock_; // The environment to be used for all filesystem operations. + // Different tenant use different env. + typedef std::map<std::string, std::shared_ptr<Env>> EnvMap; + // The environment for default tenant, which belongs to kudu and must never be deleted. Env* env_; + // The map records the env information of all tenants except the default tenant. + // They must be destroyed when not used anymore. + EnvMap env_map_; // The options that the FsManager was created with. const FsManagerOpts opts_; diff --git a/src/kudu/integration-tests/ts_recovery-itest.cc b/src/kudu/integration-tests/ts_recovery-itest.cc index ea499004f..31d630c3f 100644 --- a/src/kudu/integration-tests/ts_recovery-itest.cc +++ b/src/kudu/integration-tests/ts_recovery-itest.cc @@ -213,7 +213,7 @@ TEST_F(TsRecoveryITest, TestTabletRecoveryAfterSegmentDelete) { string segment = fs_manager->GetWalSegmentFileName(tablet_id, 2); LOG(INFO) << "Deleting WAL segment: " << segment; - ASSERT_OK(fs_manager->env()->DeleteFile(segment)); + ASSERT_OK(fs_manager->GetEnv()->DeleteFile(segment)); } ASSERT_OK(cluster_->Restart()); @@ -628,7 +628,7 @@ TEST_P(TsRecoveryITestDeathTest, RecoverFromOpIdOverflow) { // been written, thus avoiding the check. string wal_dir = fs_manager->GetTabletWalDir(tablet_id); vector<string> wal_children; - ASSERT_OK(fs_manager->env()->GetChildren(wal_dir, &wal_children)); + ASSERT_OK(fs_manager->GetEnv()->GetChildren(wal_dir, &wal_children)); // Skip '.', '..', and index files. std::unordered_set<string> wal_segments; for (const auto& filename : wal_children) { @@ -640,9 +640,9 @@ TEST_P(TsRecoveryITestDeathTest, RecoverFromOpIdOverflow) { << wal_children; // If WAL segment index 1 exists, delete it. string first_segment = fs_manager->GetWalSegmentFileName(tablet_id, 1); - if (fs_manager->env()->FileExists(first_segment)) { + if (fs_manager->GetEnv()->FileExists(first_segment)) { LOG(INFO) << "Deleting first WAL segment: " << first_segment; - ASSERT_OK(fs_manager->env()->DeleteFile(first_segment)); + ASSERT_OK(fs_manager->GetEnv()->DeleteFile(first_segment)); } // We also need to update the ConsensusMetadata to match with the term we diff --git a/src/kudu/kserver/kserver.cc b/src/kudu/kserver/kserver.cc index 73021ae5c..cb130a34b 100644 --- a/src/kudu/kserver/kserver.cc +++ b/src/kudu/kserver/kserver.cc @@ -24,6 +24,7 @@ #include <mutex> #include <ostream> #include <string> +#include <type_traits> #include <gflags/gflags.h> #include <glog/logging.h> @@ -31,7 +32,6 @@ #include "kudu/fs/fs_manager.h" #include "kudu/gutil/strings/numbers.h" #include "kudu/gutil/strings/substitute.h" -#include "kudu/rpc/messenger.h" #include "kudu/util/env.h" #include "kudu/util/faststring.h" #include "kudu/util/flag_tags.h" @@ -159,7 +159,7 @@ Status KuduServer::Init() { // These pools are shared by all replicas hosted by this server, and thus // are capped at a portion of the overall per-euid thread resource limit. - auto server_wide_pool_limit = GetThreadPoolThreadLimit(fs_manager_->env()); + auto server_wide_pool_limit = GetThreadPoolThreadLimit(fs_manager_->GetEnv()); LOG(INFO) << Substitute("Server-wide thread pool size limit: $0", server_wide_pool_limit); RETURN_NOT_OK(ThreadPoolBuilder("prepare") diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc index 18340b4c8..e1cb03d3f 100644 --- a/src/kudu/master/catalog_manager.cc +++ b/src/kudu/master/catalog_manager.cc @@ -1031,7 +1031,7 @@ CatalogManager::CatalogManager(Master* master) ipki_private_key_password_(""), tsk_private_key_password_("") { if (RangerAuthzProvider::IsEnabled()) { - authz_provider_.reset(new RangerAuthzProvider(master_->fs_manager()->env(), + authz_provider_.reset(new RangerAuthzProvider(master_->fs_manager()->GetEnv(), master_->metric_entity())); } else { authz_provider_.reset(new DefaultAuthzProvider); diff --git a/src/kudu/tablet/tablet_bootstrap-test.cc b/src/kudu/tablet/tablet_bootstrap-test.cc index 75c042741..055d12680 100644 --- a/src/kudu/tablet/tablet_bootstrap-test.cc +++ b/src/kudu/tablet/tablet_bootstrap-test.cc @@ -23,6 +23,7 @@ #include <optional> #include <ostream> #include <string> +#include <type_traits> #include <utility> #include <vector> @@ -47,7 +48,6 @@ #include "kudu/consensus/log-test-base.h" #include "kudu/consensus/log.h" #include "kudu/consensus/log_anchor_registry.h" -#include "kudu/consensus/log_reader.h" #include "kudu/consensus/log_util.h" #include "kudu/consensus/metadata.pb.h" #include "kudu/consensus/opid.pb.h" @@ -324,7 +324,7 @@ TEST_F(BootstrapTest, TestOrphanCommit) { ASSERT_OK(AppendCommit(opid)); log::SegmentSequence segments; log_->reader()->GetSegmentsSnapshot(&segments); - fs_manager_->env()->DeleteFile(segments[0]->path()); + fs_manager_->GetEnv()->DeleteFile(segments[0]->path()); // Untrack the tablet in the data dir manager so upon the next call to // BootstrapTestTablet, the tablet metadata's data dir group can be loaded. @@ -392,7 +392,7 @@ TEST_F(BootstrapTest, TestNonOrphansAfterOrphanCommit) { log::SegmentSequence segments; log_->reader()->GetSegmentsSnapshot(&segments); - fs_manager_->env()->DeleteFile(segments[0]->path()); + fs_manager_->GetEnv()->DeleteFile(segments[0]->path()); current_index_ += 2; diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc index fea9b8f10..a5af0f0ec 100644 --- a/src/kudu/tablet/tablet_bootstrap.cc +++ b/src/kudu/tablet/tablet_bootstrap.cc @@ -727,11 +727,11 @@ Status TabletBootstrap::PrepareRecoveryDir(bool* needs_recovery) { if (fs_manager->Exists(log_dir)) { LOG_WITH_PREFIX(INFO) << "Deleting old log files from previous recovery attempt in " << log_dir; - RETURN_NOT_OK_PREPEND(fs_manager->env()->DeleteRecursively(log_dir), + RETURN_NOT_OK_PREPEND(fs_manager->GetEnv()->DeleteRecursively(log_dir), "Could not recursively delete old log dir " + log_dir); } - RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(fs_manager->env(), log_dir), + RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(fs_manager->GetEnv(), log_dir), "Failed to create log directory " + log_dir); *needs_recovery = true; @@ -741,7 +741,7 @@ Status TabletBootstrap::PrepareRecoveryDir(bool* needs_recovery) { // If we made it here, there was no pre-existing recovery dir. // Now we look for log files in log_dir, and if we find any then we rename // the whole log_dir to a recovery dir and return needs_recovery = true. - RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(fs_manager->env(), log_dir), + RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(fs_manager->GetEnv(), log_dir), "Failed to create log dir"); vector<string> children; @@ -764,10 +764,10 @@ Status TabletBootstrap::PrepareRecoveryDir(bool* needs_recovery) { // and then re-create the log directory. VLOG_WITH_PREFIX(1) << "Moving log directory " << log_dir << " to recovery directory " << recovery_path << " in preparation for log replay"; - RETURN_NOT_OK_PREPEND(fs_manager->env()->RenameFile(log_dir, recovery_path), + RETURN_NOT_OK_PREPEND(fs_manager->GetEnv()->RenameFile(log_dir, recovery_path), Substitute("Could not move log directory $0 to recovery dir $1", log_dir, recovery_path)); - RETURN_NOT_OK_PREPEND(fs_manager->env()->CreateDir(log_dir), + RETURN_NOT_OK_PREPEND(fs_manager->GetEnv()->CreateDir(log_dir), "Failed to recreate log directory " + log_dir); } return Status::OK(); @@ -783,7 +783,7 @@ Status TabletBootstrap::OpenLogReaderInRecoveryDir() { // isn't fsynced() during writing, its contents are useless to us. scoped_refptr<LogIndex> log_index(nullptr); const string recovery_dir = fs_manager->GetTabletWalRecoveryDir(tablet_id); - RETURN_NOT_OK_PREPEND(LogReader::Open(fs_manager->env(), recovery_dir, log_index, tablet_id, + RETURN_NOT_OK_PREPEND(LogReader::Open(fs_manager->GetEnv(), recovery_dir, log_index, tablet_id, tablet_->GetMetricEntity().get(), file_cache_, &log_reader_), diff --git a/src/kudu/tablet/tablet_metadata.cc b/src/kudu/tablet/tablet_metadata.cc index 0c8065227..706415145 100644 --- a/src/kudu/tablet/tablet_metadata.cc +++ b/src/kudu/tablet/tablet_metadata.cc @@ -114,7 +114,7 @@ Status TabletMetadata::CreateNew(FsManager* fs_manager, scoped_refptr<TabletMetadata>* metadata) { // Verify that no existing tablet exists with the same ID. - if (fs_manager->env()->FileExists(fs_manager->GetTabletMetadataPath(tablet_id))) { + if (fs_manager->GetEnv()->FileExists(fs_manager->GetTabletMetadataPath(tablet_id))) { return Status::AlreadyPresent("Tablet already exists", tablet_id); } @@ -302,7 +302,7 @@ Status TabletMetadata::DeleteSuperBlock() { } string path = fs_manager_->GetTabletMetadataPath(tablet_id_); - RETURN_NOT_OK_PREPEND(fs_manager_->env()->DeleteFile(path), + RETURN_NOT_OK_PREPEND(fs_manager_->GetEnv()->DeleteFile(path), "Unable to delete superblock for tablet " + tablet_id_); return Status::OK(); } @@ -375,7 +375,7 @@ Status TabletMetadata::LoadFromDisk() { Status TabletMetadata::UpdateOnDiskSize() { string path = fs_manager_->GetTabletMetadataPath(tablet_id_); uint64_t on_disk_size; - RETURN_NOT_OK(fs_manager()->env()->GetFileSize(path, &on_disk_size)); + RETURN_NOT_OK(fs_manager()->GetEnv()->GetFileSize(path, &on_disk_size)); on_disk_size_.store(on_disk_size, memory_order_relaxed); return Status::OK(); } @@ -708,7 +708,7 @@ Status TabletMetadata::ReplaceSuperBlockUnlocked(const TabletSuperBlockPB &pb) { string path = fs_manager_->GetTabletMetadataPath(tablet_id_); RETURN_NOT_OK_PREPEND(pb_util::WritePBContainerToPath( - fs_manager_->env(), path, pb, + fs_manager_->GetEnv(), path, pb, pb_util::OVERWRITE, pb_util::SYNC, pb_util::SENSITIVE), Substitute("Failed to write tablet metadata $0", tablet_id_)); @@ -731,7 +731,7 @@ std::optional<consensus::OpId> TabletMetadata::tombstone_last_logged_opid() cons Status TabletMetadata::ReadSuperBlockFromDisk(TabletSuperBlockPB* superblock) const { string path = fs_manager_->GetTabletMetadataPath(tablet_id_); RETURN_NOT_OK_PREPEND( - pb_util::ReadPBContainerFromPath(fs_manager_->env(), path, superblock, pb_util::SENSITIVE), + pb_util::ReadPBContainerFromPath(fs_manager_->GetEnv(), path, superblock, pb_util::SENSITIVE), Substitute("Could not load tablet metadata from $0", path)); return Status::OK(); } diff --git a/src/kudu/tools/tool_action_local_replica.cc b/src/kudu/tools/tool_action_local_replica.cc index b3c85bb03..8d92780a3 100644 --- a/src/kudu/tools/tool_action_local_replica.cc +++ b/src/kudu/tools/tool_action_local_replica.cc @@ -509,7 +509,7 @@ Status PrintReplicaUuids(const RunnerContext& context) { Status BackupConsensusMetadata(FsManager* fs_manager, const string& tablet_id) { - Env* env = fs_manager->env(); + Env* env = fs_manager->GetEnv(); string cmeta_filename = fs_manager->GetConsensusMetadataPath(tablet_id); string backup_filename = Substitute("$0.pre_rewrite.$1", cmeta_filename, env->NowMicros()); WritableFileOptions opts; diff --git a/src/kudu/tserver/tablet_copy_client-test.cc b/src/kudu/tserver/tablet_copy_client-test.cc index 56abb4a5a..186a5da78 100644 --- a/src/kudu/tserver/tablet_copy_client-test.cc +++ b/src/kudu/tserver/tablet_copy_client-test.cc @@ -215,13 +215,15 @@ class TabletCopyClientTest : public TabletCopyTest { }; Status TabletCopyClientTest::CompareFileContents(const string& path1, const string& path2) { - shared_ptr<RandomAccessFile> file1, file2; + shared_ptr<RandomAccessFile> file1; + shared_ptr<RandomAccessFile> file2; RandomAccessFileOptions opts; opts.is_sensitive = true; - RETURN_NOT_OK(env_util::OpenFileForRandom(opts, fs_manager_->env(), path1, &file1)); - RETURN_NOT_OK(env_util::OpenFileForRandom(opts, fs_manager_->env(), path2, &file2)); + RETURN_NOT_OK(env_util::OpenFileForRandom(opts, fs_manager_->GetEnv(), path1, &file1)); + RETURN_NOT_OK(env_util::OpenFileForRandom(opts, fs_manager_->GetEnv(), path2, &file2)); - uint64_t size1, size2; + uint64_t size1; + uint64_t size2; RETURN_NOT_OK(file1->Size(&size1)); RETURN_NOT_OK(file2->Size(&size2)); size1 -= file1->GetEncryptionHeaderSize(); diff --git a/src/kudu/tserver/tablet_copy_client.cc b/src/kudu/tserver/tablet_copy_client.cc index 47846bc59..7c4e872e3 100644 --- a/src/kudu/tserver/tablet_copy_client.cc +++ b/src/kudu/tserver/tablet_copy_client.cc @@ -530,7 +530,7 @@ Status TabletCopyClient::Finish() { string meta_copy_path = Substitute("$0.copy.$1$2", meta_path, start_time_micros_, kTmpInfix); WritableFileOptions opts; opts.is_sensitive = true; - RETURN_NOT_OK_PREPEND(env_util::CopyFile(dst_fs_manager_->env(), meta_path, + RETURN_NOT_OK_PREPEND(env_util::CopyFile(dst_fs_manager_->GetEnv(), meta_path, meta_copy_path, opts), "Unable to make copy of tablet metadata"); } @@ -626,8 +626,8 @@ Status TabletCopyClient::DownloadWALs() { // not kept from previous copies and runs. RETURN_NOT_OK(log::Log::DeleteOnDiskData(dst_fs_manager_, tablet_id_)); string path = dst_fs_manager_->GetTabletWalDir(tablet_id_); - RETURN_NOT_OK(dst_fs_manager_->env()->CreateDir(path)); - RETURN_NOT_OK(dst_fs_manager_->env()->SyncDir(DirName(path))); // fsync() parent dir. + RETURN_NOT_OK(dst_fs_manager_->GetEnv()->CreateDir(path)); + RETURN_NOT_OK(dst_fs_manager_->GetEnv()->SyncDir(DirName(path))); // fsync() parent dir. // Download the WAL segments. const int num_segments = wal_seqnos_.size(); @@ -810,7 +810,7 @@ Status TabletCopyClient::DownloadWAL(uint64_t wal_segment_seqno) { opts.sync_on_close = true; opts.is_sensitive = true; unique_ptr<WritableFile> writer; - RETURN_NOT_OK_PREPEND(dst_fs_manager_->env()->NewWritableFile(opts, dest_path, &writer), + RETURN_NOT_OK_PREPEND(dst_fs_manager_->GetEnv()->NewWritableFile(opts, dest_path, &writer), "Unable to open file for writing"); RETURN_NOT_OK_PREPEND(TransferFile(data_id, writer.get()), Substitute("Unable to download WAL segment with seq. number $0", @@ -839,7 +839,7 @@ Status TabletCopyClient::WriteConsensusMetadata() { string cmeta_copy_path = Substitute("$0.copy.$1$2", cmeta_path, start_time_micros_, kTmpInfix); WritableFileOptions opts; opts.is_sensitive = true; - RETURN_NOT_OK_PREPEND(env_util::CopyFile(dst_fs_manager_->env(), cmeta_path, + RETURN_NOT_OK_PREPEND(env_util::CopyFile(dst_fs_manager_->GetEnv(), cmeta_path, cmeta_copy_path, opts), "Unable to make copy of consensus metadata"); } diff --git a/src/kudu/tserver/tablet_copy_service-test.cc b/src/kudu/tserver/tablet_copy_service-test.cc index d7a0714d1..f85bed979 100644 --- a/src/kudu/tserver/tablet_copy_service-test.cc +++ b/src/kudu/tserver/tablet_copy_service-test.cc @@ -21,6 +21,7 @@ #include <memory> #include <ostream> #include <string> +#include <type_traits> #include <thread> #include <vector> @@ -32,7 +33,7 @@ #include "kudu/common/wire_protocol.h" #include "kudu/consensus/log.h" #include "kudu/consensus/log.pb.h" -#include "kudu/consensus/log_reader.h" +#include "kudu/consensus/log_reader.h" // IWYU pragma: keep #include "kudu/consensus/log_util.h" #include "kudu/fs/block_id.h" #include "kudu/fs/data_dirs.h" @@ -392,7 +393,7 @@ TEST_F(TabletCopyServiceTest, TestFetchInvalidBlockOffset) { RpcController controller; // Impossible offset. uint64_t offset = std::numeric_limits<uint64_t>::max() - - mini_server_->server()->fs_manager()->env()->GetEncryptionHeaderSize(); + mini_server_->server()->fs_manager()->GetEnv()->GetEncryptionHeaderSize(); Status status = DoFetchData(session_id, AsDataTypeId(FirstColumnBlockId(superblock)), &offset, nullptr, &resp, &controller); NO_FATALS(AssertRemoteError(status, controller.error_response(), diff --git a/src/kudu/tserver/tablet_copy_source_session.cc b/src/kudu/tserver/tablet_copy_source_session.cc index aaa227d87..645392328 100644 --- a/src/kudu/tserver/tablet_copy_source_session.cc +++ b/src/kudu/tserver/tablet_copy_source_session.cc @@ -519,7 +519,7 @@ Status LocalTabletCopySourceSession::InitOnce() { // Read the SuperBlock from disk. string path = fs_manager_->GetTabletMetadataPath(tablet_id_); RETURN_NOT_OK_PREPEND( - pb_util::ReadPBContainerFromPath(fs_manager_->env(), path, &tablet_superblock_, + pb_util::ReadPBContainerFromPath(fs_manager_->GetEnv(), path, &tablet_superblock_, pb_util::SENSITIVE), Substitute("Unable to access superblock for tablet $0", tablet_id_)); diff --git a/src/kudu/util/env.h b/src/kudu/util/env.h index 15d1c13dc..be1dd82ca 100644 --- a/src/kudu/util/env.h +++ b/src/kudu/util/env.h @@ -80,6 +80,9 @@ class Env { // environment. Unlike the default env, this is not owned by Kudu, and // must be destroyed when not used anymore. static std::unique_ptr<Env> NewEnv(); + // Same as the usage of "NewEnv()", it also needs to be destroyed when + // no longer used. + static std::shared_ptr<Env> NewSharedEnv(); // Create a brand new sequentially-readable file with the specified name. // On success, stores a pointer to the new file in *result and returns OK. diff --git a/src/kudu/util/env_posix.cc b/src/kudu/util/env_posix.cc index 9b7eb936d..2fc0cdc32 100644 --- a/src/kudu/util/env_posix.cc +++ b/src/kudu/util/env_posix.cc @@ -90,6 +90,7 @@ using base::subtle::Atomic64; using base::subtle::Barrier_AtomicIncrement; using std::accumulate; +using std::shared_ptr; using std::string; using std::unique_ptr; using std::vector; @@ -2433,6 +2434,10 @@ unique_ptr<Env> Env::NewEnv() { return unique_ptr<Env>(new PosixEnv()); } +shared_ptr<Env> Env::NewSharedEnv() { + return shared_ptr<Env>(new PosixEnv()); +} + std::ostream& operator<<(std::ostream& o, Env::ResourceLimitType t) { return o << ResourceLimitTypeToString(t); }
