This is an automated email from the ASF dual-hosted git repository. laiyingchun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit d552d7e1f65c1d68e7de57a376adfb9802cb9211 Author: Yingchun Lai <[email protected]> AuthorDate: Sun Aug 29 23:28:57 2021 +0800 [fs] Refactor fs module Only some code style refactoring on fs module, without any functional changes. Change-Id: Iaa45637494b36cc1f4f00affdf995a25eb6231bc Reviewed-on: http://gerrit.cloudera.org:8080/17845 Tested-by: Kudu Jenkins Reviewed-by: Alexey Serbin <[email protected]> --- src/kudu/fs/block_id.cc | 8 +++- src/kudu/fs/dir_manager.cc | 42 +++++++++---------- src/kudu/fs/dir_manager.h | 11 +++-- src/kudu/fs/file_block_manager.cc | 4 +- src/kudu/fs/fs_manager.cc | 5 --- src/kudu/fs/fs_manager.h | 4 -- src/kudu/fs/fs_report.cc | 27 ++++++------ src/kudu/fs/log_block_manager.cc | 86 +++++++++++++++++++-------------------- src/kudu/fs/log_block_manager.h | 5 +-- 9 files changed, 92 insertions(+), 100 deletions(-) diff --git a/src/kudu/fs/block_id.cc b/src/kudu/fs/block_id.cc index 8b82117..bb20e81 100644 --- a/src/kudu/fs/block_id.cc +++ b/src/kudu/fs/block_id.cc @@ -17,10 +17,12 @@ #include "kudu/fs/block_id.h" -#include <glog/logging.h> +#include <ostream> #include <string> #include <vector> +#include <glog/logging.h> + #include "kudu/fs/fs.pb.h" #include "kudu/gutil/strings/join.h" @@ -31,6 +33,10 @@ namespace kudu { const uint64_t BlockId::kInvalidId = 0; +std::ostream& operator<<(std::ostream& o, const BlockId& block_id) { + return o << block_id.ToString(); +} + string BlockId::JoinStrings(const vector<BlockId>& blocks) { vector<string> strings; strings.reserve(blocks.size()); diff --git a/src/kudu/fs/dir_manager.cc b/src/kudu/fs/dir_manager.cc index 6a47ed9..e53be7c 100644 --- a/src/kudu/fs/dir_manager.cc +++ b/src/kudu/fs/dir_manager.cc @@ -224,9 +224,9 @@ Status DirManager::Create() { RETURN_NOT_OK_PREPEND(r.status, "Could not create directory manager with disks failed"); } vector<unique_ptr<DirInstanceMetadataFile>> loaded_instances; - bool has_existing_instances; - RETURN_NOT_OK(LoadInstances(&loaded_instances, &has_existing_instances)); - if (has_existing_instances) { + bool has_healthy_instances = true; + RETURN_NOT_OK(LoadInstances(&loaded_instances, &has_healthy_instances)); + if (has_healthy_instances) { return Status::AlreadyPresent("instance files already exist"); } @@ -414,7 +414,9 @@ Status DirManager::UpdateHealthyInstances( Status DirManager::LoadInstances( vector<unique_ptr<DirInstanceMetadataFile>>* instance_files, - bool* has_existing_instances) { + bool* has_healthy_instances) { + DCHECK(instance_files); + LockMode lock_mode; if (!lock_dirs()) { lock_mode = LockMode::NONE; @@ -423,11 +425,9 @@ Status DirManager::LoadInstances( } else { lock_mode = LockMode::MANDATORY; } - vector<string> missing_roots_tmp; vector<unique_ptr<DirInstanceMetadataFile>> loaded_instances; ObjectIdGenerator gen; - for (int i = 0; i < canonicalized_fs_roots_.size(); i++) { - const auto& root = canonicalized_fs_roots_[i]; + for (const auto& root : canonicalized_fs_roots_) { string dir = JoinPathSegments(root.path, dir_name()); string instance_filename = JoinPathSegments(dir, instance_metadata_filename()); @@ -470,10 +470,10 @@ Status DirManager::LoadInstances( num_healthy_instances++; } } - if (has_existing_instances) { - *has_existing_instances = num_healthy_instances > 0; + if (has_healthy_instances) { + *has_healthy_instances = num_healthy_instances > 0; } - instance_files->swap(loaded_instances); + *instance_files = std::move(loaded_instances); return Status::OK(); } @@ -507,12 +507,12 @@ Status DirManager::Open() { canonicalized_fs_roots_.size(), max_dirs())); } - vector<unique_ptr<DirInstanceMetadataFile>> loaded_instances; // Load the instance files from disk. - bool has_existing_instances; - RETURN_NOT_OK_PREPEND(LoadInstances(&loaded_instances, &has_existing_instances), + bool has_healthy_instances = true; + vector<unique_ptr<DirInstanceMetadataFile>> loaded_instances; + RETURN_NOT_OK_PREPEND(LoadInstances(&loaded_instances, &has_healthy_instances), "failed to load instance files"); - if (!has_existing_instances) { + if (!has_healthy_instances) { return Status::NotFound( "could not open directory manager, no healthy directories found"); } @@ -521,15 +521,16 @@ Status DirManager::Open() { if (!opts_.read_only && opts_.dir_type != "file" && opts_.update_instances != UpdateInstanceBehavior::DONT_UPDATE) { RETURN_NOT_OK_PREPEND( - CreateNewDirectoriesAndUpdateInstances( - std::move(loaded_instances)), - "could not add new directories"); - RETURN_NOT_OK_PREPEND(LoadInstances(&loaded_instances, &has_existing_instances), + CreateNewDirectoriesAndUpdateInstances(std::move(loaded_instances)), + "could not add new directories"); + vector<unique_ptr<DirInstanceMetadataFile>> new_loaded_instances; + RETURN_NOT_OK_PREPEND(LoadInstances(&new_loaded_instances, &has_healthy_instances), "failed to load instance files after updating"); - if (!has_existing_instances) { + if (!has_healthy_instances) { return Status::IOError( "could not open directory manager, no healthy directories found"); } + loaded_instances = std::move(new_loaded_instances); } // All instances are present and accounted for. Time to create the in-memory @@ -564,7 +565,7 @@ Status DirManager::Open() { // Create a per-dir thread pool. unique_ptr<ThreadPool> pool; - RETURN_NOT_OK(ThreadPoolBuilder(Substitute("dir $0", i)) + RETURN_NOT_OK(ThreadPoolBuilder(Substitute("dir $0", dir)) .set_max_threads(num_threads_per_dir_) .set_trace_metric_prefix("dirs") .Build(&pool)); @@ -673,7 +674,6 @@ Status DirManager::MarkDirFailed(int uuid_idx, const string& error_message) { return Status::OK(); } - bool DirManager::IsDirFailed(int uuid_idx) const { DCHECK_LT(uuid_idx, dirs_.size()); shared_lock<rw_spinlock> lock(dir_group_lock_.get_lock()); diff --git a/src/kudu/fs/dir_manager.h b/src/kudu/fs/dir_manager.h index d162ae8..88a17bd 100644 --- a/src/kudu/fs/dir_manager.h +++ b/src/kudu/fs/dir_manager.h @@ -166,7 +166,7 @@ class Dir { bool is_shutdown_; - // Protects 'last_space_check_', 'is_full_' and available_bytes_. + // Protects 'last_space_check_', 'is_full_' and 'available_bytes_'. mutable simple_spinlock lock_; MonoTime last_space_check_; bool is_full_; @@ -251,6 +251,11 @@ class DirManager { return failed_dirs_; } + bool AreAllDirsFailed() const { + shared_lock<rw_spinlock> group_lock(dir_group_lock_.get_lock()); + return failed_dirs_.size() == dirs_.size(); + } + // Return a list of the canonicalized root directory names. std::vector<std::string> GetRoots() const; @@ -341,14 +346,14 @@ class DirManager { // On success, 'instance_files' contains instance objects, including those // that failed to load because they were missing or because of a disk // error; they are still considered "loaded" and are labeled unhealthy - // internally. 'has_existing_instances' is set to true if any of the instance + // internally. 'has_healthy_instances' is set to true if any of the instance // files are healthy. // // Returns an error if an instance file fails in an irreconcileable way (e.g. // the file is locked). Status LoadInstances( std::vector<std::unique_ptr<DirInstanceMetadataFile>>* instance_files, - bool* has_existing_instances); + bool* has_healthy_instances); // Takes the set of instance files, does some basic verification on them, // creates any that don't exist on disk, and updates any that have a diff --git a/src/kudu/fs/file_block_manager.cc b/src/kudu/fs/file_block_manager.cc index 2e631cb..06b83ed 100644 --- a/src/kudu/fs/file_block_manager.cc +++ b/src/kudu/fs/file_block_manager.cc @@ -949,9 +949,7 @@ Status FileBlockManager::GetAllBlockIds(vector<BlockId>* block_ids) { GetAllBlockIdsForDir(this->env_, dd, bid_vec, s); }); } - for (const auto& dd : dd_manager_->dirs()) { - dd->WaitOnClosures(); - } + dd_manager_->WaitOnClosures(); // A failure on any data directory is fatal. for (const auto& s : statuses) { diff --git a/src/kudu/fs/fs_manager.cc b/src/kudu/fs/fs_manager.cc index a60ef1a..3a747d6 100644 --- a/src/kudu/fs/fs_manager.cc +++ b/src/kudu/fs/fs_manager.cc @@ -30,7 +30,6 @@ #include <gflags/gflags.h> #include <glog/logging.h> -#include "kudu/fs/block_id.h" #include "kudu/fs/block_manager.h" #include "kudu/fs/data_dirs.h" #include "kudu/fs/error_manager.h" @@ -794,8 +793,4 @@ bool FsManager::BlockExists(const BlockId& block_id) const { return block_manager_->OpenBlock(block_id, &block).ok(); } -std::ostream& operator<<(std::ostream& o, const BlockId& block_id) { - return o << block_id.ToString(); -} - } // namespace kudu diff --git a/src/kudu/fs/fs_manager.h b/src/kudu/fs/fs_manager.h index 82c3bbc..52e72bf 100644 --- a/src/kudu/fs/fs_manager.h +++ b/src/kudu/fs/fs_manager.h @@ -219,8 +219,6 @@ class FsManager { Status OpenBlock(const BlockId& block_id, std::unique_ptr<fs::ReadableBlock>* block); - Status DeleteBlock(const BlockId& block_id); - bool BlockExists(const BlockId& block_id) const; // ========================================================================== @@ -366,8 +364,6 @@ class FsManager { static const char *kTabletMetadataDirName; static const char *kWalDirName; static const char *kInstanceMetadataFileName; - static const char *kInstanceMetadataMagicNumber; - static const char *kTabletSuperBlockMagicNumber; static const char *kConsensusMetadataDirName; // The environment to be used for all filesystem operations. diff --git a/src/kudu/fs/fs_report.cc b/src/kudu/fs/fs_report.cc index 9689ca8..1519c00 100644 --- a/src/kudu/fs/fs_report.cc +++ b/src/kudu/fs/fs_report.cc @@ -39,12 +39,15 @@ using std::vector; using strings::Substitute; using strings::SubstituteAndAppend; +#define MERGE_ENTRIES_FROM(other) \ + entries.insert(entries.end(), (other).entries.begin(), (other).entries.end()) + /////////////////////////////////////////////////////////////////////////////// // MissingBlockCheck /////////////////////////////////////////////////////////////////////////////// void MissingBlockCheck::MergeFrom(const MissingBlockCheck& other) { - entries.insert(entries.end(), other.entries.begin(), other.entries.end()); + MERGE_ENTRIES_FROM(other); } string MissingBlockCheck::ToString() const { @@ -85,15 +88,13 @@ void OrphanedBlockCheck::MergeFrom(const OrphanedBlockCheck& other) { string OrphanedBlockCheck::ToString() const { // Aggregate interesting stats from all of the entries. - int64_t orphaned_block_count_repaired = 0; int64_t orphaned_block_bytes = 0; + int64_t orphaned_block_count_repaired = 0; int64_t orphaned_block_bytes_repaired = 0; for (const auto& ob : entries) { - if (ob.repaired) { - orphaned_block_count_repaired++; - } orphaned_block_bytes += ob.length; if (ob.repaired) { + orphaned_block_count_repaired++; orphaned_block_bytes_repaired += ob.length; } } @@ -117,20 +118,18 @@ OrphanedBlockCheck::Entry::Entry(BlockId b, int64_t l) void LBMFullContainerSpaceCheck::MergeFrom( const LBMFullContainerSpaceCheck& other) { - entries.insert(entries.end(), other.entries.begin(), other.entries.end()); + MERGE_ENTRIES_FROM(other); } string LBMFullContainerSpaceCheck::ToString() const { // Aggregate interesting stats from all of the entries. - int64_t full_container_space_count_repaired = 0; int64_t full_container_space_bytes = 0; + int64_t full_container_space_count_repaired = 0; int64_t full_container_space_bytes_repaired = 0; for (const auto& fcp : entries) { - if (fcp.repaired) { - full_container_space_count_repaired++; - } full_container_space_bytes += fcp.excess_bytes; if (fcp.repaired) { + full_container_space_count_repaired++; full_container_space_bytes_repaired += fcp.excess_bytes; } } @@ -154,7 +153,7 @@ LBMFullContainerSpaceCheck::Entry::Entry(string c, int64_t e) void LBMIncompleteContainerCheck::MergeFrom( const LBMIncompleteContainerCheck& other) { - entries.insert(entries.end(), other.entries.begin(), other.entries.end()); + MERGE_ENTRIES_FROM(other); } string LBMIncompleteContainerCheck::ToString() const { @@ -180,7 +179,7 @@ LBMIncompleteContainerCheck::Entry::Entry(string c) /////////////////////////////////////////////////////////////////////////////// void LBMMalformedRecordCheck::MergeFrom(const LBMMalformedRecordCheck& other) { - entries.insert(entries.end(), other.entries.begin(), other.entries.end()); + MERGE_ENTRIES_FROM(other); } string LBMMalformedRecordCheck::ToString() const { @@ -205,7 +204,7 @@ LBMMalformedRecordCheck::Entry::Entry(string c, BlockRecordPB* r) /////////////////////////////////////////////////////////////////////////////// void LBMMisalignedBlockCheck::MergeFrom(const LBMMisalignedBlockCheck& other) { - entries.insert(entries.end(), other.entries.begin(), other.entries.end()); + MERGE_ENTRIES_FROM(other); } string LBMMisalignedBlockCheck::ToString() const { @@ -230,7 +229,7 @@ LBMMisalignedBlockCheck::Entry::Entry(string c, BlockId b) void LBMPartialRecordCheck::MergeFrom( const LBMPartialRecordCheck& other) { - entries.insert(entries.end(), other.entries.begin(), other.entries.end()); + MERGE_ENTRIES_FROM(other); } string LBMPartialRecordCheck::ToString() const { diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc index 993ef80..a94e619 100644 --- a/src/kudu/fs/log_block_manager.cc +++ b/src/kudu/fs/log_block_manager.cc @@ -285,25 +285,25 @@ class LogWritableBlock : public WritableBlock { LogWritableBlock(LogBlockContainerRefPtr container, BlockId block_id, int64_t block_offset); - virtual ~LogWritableBlock(); + ~LogWritableBlock(); - virtual Status Close() OVERRIDE; + Status Close() override; - virtual Status Abort() OVERRIDE; + Status Abort() override; - virtual const BlockId& id() const OVERRIDE; + const BlockId& id() const override; - virtual BlockManager* block_manager() const OVERRIDE; + BlockManager* block_manager() const override; - virtual Status Append(const Slice& data) OVERRIDE; + Status Append(const Slice& data) override; - virtual Status AppendV(ArrayView<const Slice> data) OVERRIDE; + Status AppendV(ArrayView<const Slice> data) override; - virtual Status Finalize() OVERRIDE; + Status Finalize() override; - virtual size_t BytesAppended() const OVERRIDE; + size_t BytesAppended() const override; - virtual State state() const OVERRIDE; + State state() const override; // Actually close the block, finalizing it if it has not yet been // finalized. Also updates various metrics. @@ -365,8 +365,6 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> { NO_SYNC }; - static const char* kMagic; - // Creates a new block container in 'dir'. static Status Create(LogBlockManager* block_manager, Dir* dir, @@ -1385,11 +1383,11 @@ class LogBlockCreationTransaction : public BlockCreationTransaction { public: LogBlockCreationTransaction() = default; - virtual ~LogBlockCreationTransaction() = default; + ~LogBlockCreationTransaction() = default; - virtual void AddCreatedBlock(unique_ptr<WritableBlock> block) override; + void AddCreatedBlock(unique_ptr<WritableBlock> block) override; - virtual Status CommitCreatedBlocks() override; + Status CommitCreatedBlocks() override; private: vector<unique_ptr<LogWritableBlock>> created_blocks_; @@ -1445,11 +1443,11 @@ class LogBlockDeletionTransaction : public BlockDeletionTransaction, // blocks. This includes: // 1. Punching holes in deleted blocks, and // 2. Deleting dead containers outright. - virtual ~LogBlockDeletionTransaction(); + ~LogBlockDeletionTransaction(); - virtual void AddDeletedBlock(BlockId block) override; + void AddDeletedBlock(BlockId block) override; - virtual Status CommitDeletedBlocks(vector<BlockId>* deleted) override; + Status CommitDeletedBlocks(vector<BlockId>* deleted) override; // Add the given block that needs to be deleted to 'deleted_interval_map_', // which keeps track of container and the range to be hole punched. @@ -1825,21 +1823,21 @@ class LogReadableBlock : public ReadableBlock { public: explicit LogReadableBlock(LogBlockRefPtr log_block); - virtual ~LogReadableBlock(); + ~LogReadableBlock(); - virtual Status Close() OVERRIDE; + Status Close() override; - virtual const BlockId& id() const OVERRIDE; + const BlockId& id() const override; - virtual BlockManager* block_manager() const OVERRIDE; + BlockManager* block_manager() const override; - virtual Status Size(uint64_t* sz) const OVERRIDE; + Status Size(uint64_t* sz) const override; - virtual Status Read(uint64_t offset, Slice result) const OVERRIDE; + Status Read(uint64_t offset, Slice result) const override; - virtual Status ReadV(uint64_t offset, ArrayView<Slice> results) const OVERRIDE; + Status ReadV(uint64_t offset, ArrayView<Slice> results) const override; - virtual size_t memory_footprint() const OVERRIDE; + size_t memory_footprint() const override; private: // A reference to this block's metadata. @@ -2093,13 +2091,12 @@ Status LogBlockManager::Open(FsReport* report) { auto* s = &statuses[i]; dd->ExecClosure([this, dd_raw, results, s]() { this->OpenDataDir(dd_raw, results, s); + WARN_NOT_OK(*s, Substitute("failed to open dir $0", dd_raw->dir())); }); } // Wait for the opens to complete. - for (const auto& dd : dd_manager_->dirs()) { - dd->WaitOnClosures(); - } + dd_manager_->WaitOnClosures(); // Check load errors and merge each data dir's container load results, then do repair tasks. vector<unique_ptr<internal::LogBlockContainerLoadResult>> dir_results( @@ -2150,9 +2147,7 @@ Status LogBlockManager::Open(FsReport* report) { } // Wait for the repair tasks to complete. - for (const auto& dd : dd_manager_->dirs()) { - dd->WaitOnClosures(); - } + dd_manager_->WaitOnClosures(); FsReport merged_report; for (int i = 0; i < dd_manager_->dirs().size(); ++i) { @@ -2166,7 +2161,7 @@ Status LogBlockManager::Open(FsReport* report) { RETURN_ON_NON_DISK_FAILURE(dd_manager_->dirs()[i], dir_results[i]->status); } - if (dd_manager_->GetFailedDirs().size() == dd_manager_->dirs().size()) { + if (dd_manager_->AreAllDirsFailed()) { return Status::IOError("All data dirs failed to open", "", EIO); } @@ -2227,8 +2222,7 @@ Status LogBlockManager::OpenBlock(const BlockId& block_id, unique_ptr<BlockCreationTransaction> LogBlockManager::NewCreationTransaction() { CHECK(!opts_.read_only); - return unique_ptr<internal::LogBlockCreationTransaction>( - new internal::LogBlockCreationTransaction()); + return std::make_unique<internal::LogBlockCreationTransaction>(); } shared_ptr<BlockDeletionTransaction> LogBlockManager::NewDeletionTransaction() { @@ -2430,7 +2424,6 @@ Status LogBlockManager::RemoveLogBlocks(vector<BlockId> block_ids, for (const auto& block_id : block_ids) { LogBlockRefPtr lb; Status s = RemoveLogBlock(block_id, &lb); - // If we get NotFound, then the block was already deleted. if (!s.ok() && !s.IsNotFound()) { if (first_failure.ok()) first_failure = s; } else if (s.ok()) { @@ -2438,6 +2431,8 @@ Status LogBlockManager::RemoveLogBlocks(vector<BlockId> block_ids, blocks_length += lb->length(); lbs.emplace_back(std::move(lb)); } else { + // If we get NotFound, then the block was already deleted. + DCHECK(s.IsNotFound()); deleted->emplace_back(block_id); } } @@ -2521,8 +2516,6 @@ void LogBlockManager::OpenDataDir( Dir* dir, vector<unique_ptr<internal::LogBlockContainerLoadResult>>* results, Status* result_status) { - // Find all containers and open them. - unordered_set<string> containers_seen; vector<string> children; Status s = env_->GetChildren(dir->dir(), &children); if (!s.ok()) { @@ -2533,6 +2526,9 @@ void LogBlockManager::OpenDataDir( return; } + // Find all containers and open them. + unordered_set<string> containers_seen; + results->reserve(children.size() / 2); for (const string& child : children) { string container_name; if (!TryStripSuffixString( @@ -2848,15 +2844,16 @@ Status LogBlockManager::Repair( // This is a fatal inconsistency; if the repair fails, we cannot proceed. if (report->partial_record_check) { for (auto& pr : report->partial_record_check->entries) { - unique_ptr<RWFile> file; - RWFileOptions opts; - opts.mode = Env::MUST_EXIST; LogBlockContainerRefPtr container = FindPtrOrNull(containers_by_name, pr.container); if (!container) { // The container was deleted outright. pr.repaired = true; continue; } + + unique_ptr<RWFile> file; + RWFileOptions opts; + opts.mode = Env::MUST_EXIST; RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND( env_->NewRWFile(opts, StrCat(pr.container, kContainerMetadataFileSuffix), @@ -2926,8 +2923,7 @@ Status LogBlockManager::Repair( // // Register deletions to a single BlockDeletionTransaction. So, the repunched // holes belonging to the same container can be coalesced. - shared_ptr<LogBlockDeletionTransaction> transaction = - std::make_shared<LogBlockDeletionTransaction>(this); + auto transaction = std::make_shared<LogBlockDeletionTransaction>(this); for (const auto& b : need_repunching) { b->RegisterDeletion(transaction); transaction->AddBlock(b); @@ -2991,11 +2987,11 @@ Status LogBlockManager::Repair( Status LogBlockManager::RewriteMetadataFile(const LogBlockContainer& container, const vector<BlockRecordPB>& records, int64_t* file_bytes_delta) { - uint64_t old_metadata_size; const string metadata_file_name = StrCat(container.ToString(), kContainerMetadataFileSuffix); - // Get the container's data directory's UUID for error handling. const string dir = container.data_dir()->dir(); + + uint64_t old_metadata_size; RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(env_->GetFileSize(metadata_file_name, &old_metadata_size), "could not get size of old metadata file"); diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h index 91c95f3..b077947 100644 --- a/src/kudu/fs/log_block_manager.h +++ b/src/kudu/fs/log_block_manager.h @@ -192,7 +192,7 @@ class LogBlockManager : public BlockManager { FileCache* file_cache, BlockManagerOptions opts); - virtual ~LogBlockManager(); + ~LogBlockManager(); Status Open(FsReport* report) override; @@ -375,9 +375,6 @@ class LogBlockManager : public BlockManager { LogBlockContainerRefPtr container, internal::LogBlockContainerLoadResult* result); - // Perform basic initialization. - Status Init(); - ObjectIdGenerator* oid_generator() { return &oid_generator_; } Env* env() const { return env_; }
