This is an automated email from the ASF dual-hosted git repository. laiyingchun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 68797aab6a1740da15a24cb0681fe9be7685b787 Author: Yingchun Lai <[email protected]> AuthorDate: Sun Mar 12 22:41:48 2023 +0800 KUDU-3371 [fs] make LogBlockContainer a base class This patch makes LogBlockContainer a base class for the newly added LogBlockContainerNativeMeta. LogBlockContainerNativeMeta is a log-backed block container which contains a sequentially written file to store containers' metadata, that's what LogBlockContainer used to be prior to this change. Change-Id: Ieb199de95973aaeba76947ad095907272d84ca67 Reviewed-on: http://gerrit.cloudera.org:8080/19610 Tested-by: Kudu Jenkins Reviewed-by: Yuqi Du <[email protected]> --- src/kudu/fs/log_block_manager.cc | 750 ++++++++++++++++++++++++--------------- src/kudu/fs/log_block_manager.h | 74 +++- 2 files changed, 524 insertions(+), 300 deletions(-) diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc index bc1852d42..cc3ec7427 100644 --- a/src/kudu/fs/log_block_manager.cc +++ b/src/kudu/fs/log_block_manager.cc @@ -205,6 +205,7 @@ METRIC_DEFINE_gauge_uint64(server, log_block_manager_processed_containers_startu using kudu::fs::internal::LogBlock; using kudu::fs::internal::LogBlockContainer; +using kudu::fs::internal::LogBlockContainerNativeMeta; using kudu::fs::internal::LogBlockDeletionTransaction; using kudu::fs::internal::LogWritableBlock; using kudu::pb_util::ReadablePBContainerFile; @@ -427,28 +428,8 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> { NO_SYNC }; - // Creates a new block container in 'dir'. - static Status Create(LogBlockManager* block_manager, - Dir* dir, - LogBlockContainerRefPtr* container); - - // Opens an existing block container in 'dir'. - // - // Every container is comprised of two parts: "<dir>/<id>.data" and - // metadata (e.g. "<dir>/<id>.metadata"). Together, 'dir' and 'id' - // fully describe both files. - // - // Returns Status::Aborted() in the case that the metadata and data files - // both appear to have no data (e.g. due to a crash just after creating - // one of them but before writing any records). This is recorded in 'report'. - static Status Open(LogBlockManager* block_manager, - Dir* dir, - FsReport* report, - const string& id, - LogBlockContainerRefPtr* container); - // The destructor will delete files of this container if it is dead. - ~LogBlockContainer(); + virtual ~LogBlockContainer() = default; // Closes a set of blocks belonging to this container, possibly synchronizing // the dirty data and metadata to disk. @@ -458,7 +439,7 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> { // Frees the space associated with a block or a group of blocks at 'offset' // and 'length'. This is a physical operation, not a logical one; a separate - // AppendMetadataForDeleteRecords() is required to record the deletions in + // RemoveBlockIdsFromMetadata() is required to record the deletions in // container metadata. // // The on-disk effects of this call are made durable only after SyncData(). @@ -488,18 +469,18 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> { // See RWFile::ReadV(). Status ReadVData(int64_t offset, ArrayView<Slice> results) const; - // Appends block ids to delete to this container's metadata file according to 'lbs', - // the block ids deleted successfully are returned by 'deleted_block_ids', even if + // Removes block ids from this container's metadata part according to 'lbs', + // the block ids removed successfully are returned by 'deleted_block_ids', even if // returning non-OK status. // // The on-disk effects of this call are made durable only after SyncMetadata(). - Status AppendMetadataForDeleteRecords(const vector<LogBlockRefPtr>& lbs, - vector<BlockId>* deleted_block_ids); + virtual Status RemoveBlockIdsFromMetadata(const vector<LogBlockRefPtr>& lbs, + vector<BlockId>* deleted_block_ids) = 0; - // Appends 'blocks' to add to this container's metadata file. + // Adds 'blocks' to this container's metadata part. // // The on-disk effects of this call are made durable only after SyncMetadata(). - Status AppendMetadataForCreateRecords(const vector<LogWritableBlock*>& blocks); + virtual Status AddBlockIdsToMetadata(const vector<LogWritableBlock*>& blocks) = 0; // Asynchronously flush this container's data file from 'offset' through // to 'length'. @@ -513,15 +494,11 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> { // TODO(unknown): Add support to synchronize just a range. Status SyncData(); - // Synchronize this container's metadata file with the disk. On success, + // Synchronize this container's metadata part with the disk. On success, // guarantees that the metadata is made durable. // // TODO(unknown): Add support to synchronize just a range. - Status SyncMetadata(); - - // Reopen the metadata file record writer. Should be called if the underlying - // file was changed. - Status ReopenMetadataWriter(); + virtual Status SyncMetadata() = 0; // Truncates this container's data file to 'next_block_offset_' if it is // full. This effectively removes any preallocated but unused space. @@ -540,6 +517,7 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> { kReadOnly, // Read records only. kReadAndUpdate, // Read records and update container's statistic. }; + // Reads the container's metadata from disk, sanity checking and processing // records along the way. // @@ -550,13 +528,13 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> { // // Returns an error only if there was a problem accessing the container from // disk; such errors are fatal and effectively halt processing immediately. - Status ProcessRecords( + virtual Status ProcessRecords( FsReport* report, LogBlockManager::UntrackedBlockMap* live_blocks, LogBlockManager::BlockRecordMap* live_block_records, vector<LogBlockRefPtr>* dead_blocks, uint64_t* max_block_id, - ProcessRecordType type); + ProcessRecordType type) = 0; // Updates internal bookkeeping state to reflect the creation of a block. void BlockCreated(const LogBlockRefPtr& block); @@ -602,6 +580,7 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> { // Simple accessors. LogBlockManager* block_manager() const { return block_manager_; } + const string& id() const { return id_; } int64_t next_block_offset() const { return next_block_offset_.Load(); } int64_t total_bytes() const { return total_bytes_.Load(); } int64_t total_blocks() const { return total_blocks_.Load(); } @@ -609,24 +588,7 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> { int64_t live_bytes_aligned() const { return live_bytes_aligned_.Load(); } int64_t live_blocks() const { return live_blocks_.Load(); } int32_t blocks_being_written() const { return blocks_being_written_.Load(); } - bool full() const { - if (next_block_offset() >= FLAGS_log_container_max_size || - (max_num_blocks_ && total_blocks() >= max_num_blocks_)) { - return true; - } - - if (FLAGS_log_container_metadata_max_size <= 0) { - return false; - } - - // Try lock before reading metadata offset, consider it not full if lock failed. - shared_lock<RWMutex> l(metadata_compact_lock_, std::try_to_lock); - if (!l.owns_lock()) { - return false; - } - return metadata_file_->Offset() >= FLAGS_log_container_metadata_max_size; - } - + virtual bool full() const { return data_full(); } bool dead() const { return dead_.Load(); } const LogBlockManagerMetrics* metrics() const { return metrics_; } Dir* data_dir() const { return data_dir_; } @@ -663,53 +625,14 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> { return dead_.CompareAndSet(false, true); } - bool ShouldCompact() const { - shared_lock<RWMutex> l(metadata_compact_lock_); - return ShouldCompactUnlocked(); - } - - bool ShouldCompactUnlocked() const { - DCHECK_GT(FLAGS_log_container_metadata_max_size, 0); - if (live_blocks() >= - total_blocks() * FLAGS_log_container_live_metadata_before_compact_ratio) { - return false; - } - - return metadata_file_->Offset() >= FLAGS_log_container_metadata_max_size * - FLAGS_log_container_metadata_size_before_compact_ratio; - } - - void CompactMetadata(); + // Some work will be triggered after blocks have been removed from this container successfully. + virtual void PostWorkOfBlocksDeleted() {} static std::vector<BlockRecordPB> SortRecords(LogBlockManager::BlockRecordMap live_block_records); - private: - LogBlockContainer(LogBlockManager* block_manager, Dir* data_dir, - unique_ptr<WritablePBContainerFile> metadata_file, - shared_ptr<RWFile> data_file); - - // Check the container whether it is fine. - // - // OK: both files of the container exist; - // Aborted: the container will be repaired later; - // NotFound: one file of the container has gone missing; - // *-------------*------------*----------------*-------------------------*---------------------* - // |DATA\METADATA| NONE EXIST | EXIST && < MIN | EXIST && NO LIVE BLOCKS | EXIST && LIVE BLOCKS| - // *-------------*------------*----------------*-------------------------*---------------------* - // | NONE EXIST | | Aborted | Aborted | NotFound | - // *-------------*------------*----------------*-------------------------*---------------------* - // | EXIST && 0 | Aborted | Aborted | OK | OK | - // *-------------*------------*----------------*-------------------------*---------------------* - // | EXIST && >0 | NotFound | OK | OK | OK | - // *-------------*------------*----------------*-------------------------*---------------------* - // - // Note: the status here only represents the result of check. - static Status CheckContainerFiles(LogBlockManager* block_manager, - FsReport* report, - const Dir* dir, - const string& common_path, - const string& data_path, - const string& metadata_path); + protected: + LogBlockContainer(LogBlockManager* block_manager, Dir* data_dir, string id, + shared_ptr<RWFile> data_file, uint64_t data_file_size); // Processes a single block record, performing sanity checks on it and adding // it either to 'live_blocks' or 'dead_blocks'. If the record is live, it is @@ -735,6 +658,15 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> { uint64_t* max_block_id, ProcessRecordType type); + // Similar to above, but only processes a single DELETE type block record. + virtual void ProcessDeleteRecord( + BlockRecordPB* record, + FsReport* report, + LogBlockManager::UntrackedBlockMap* live_blocks, + LogBlockManager::BlockRecordMap* live_block_records, + vector<LogBlockRefPtr>* dead_blocks, + ProcessRecordType type) = 0; + // Updates this container data file's position based on the offset and length // of a block, marking this container as full if needed. Should only be called // when a block is fully written, or after an encryption header is written, as @@ -743,23 +675,26 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> { // This function is thread unsafe. void UpdateNextBlockOffset(int64_t block_offset, int64_t block_length); + // Check whether the data file is full. + bool data_full() const { + return next_block_offset() >= FLAGS_log_container_max_size || + (max_num_blocks_ && total_blocks() >= max_num_blocks_); + } + // The owning block manager. Must outlive the container itself. LogBlockManager* const block_manager_; // The data directory where the container lives. Dir* data_dir_; + // The unique id of this container. + string id_; + const optional<int64_t> max_num_blocks_; // Offset up to which we have preallocated bytes. int64_t preallocated_offset_ = 0; - // Protect 'metadata_file_', only rewriting should add write lock, - // appending and syncing only need read lock, cause there is an - // internal lock for these operations in WritablePBContainerFile. - mutable RWMutex metadata_compact_lock_; - // Opened file handles to the container's files. - unique_ptr<WritablePBContainerFile> metadata_file_; shared_ptr<RWFile> data_file_; // The offset of the next block to be written to the container. @@ -768,9 +703,6 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> { // The amount of data (post block alignment) written thus far to the container. AtomicInt<int64_t> total_bytes_; - // TODO(yingchun): add metadata bytes for metadata. - //AtomicInt<int64_t> metadata_bytes_; - // The number of blocks written thus far in the container. AtomicInt<int64_t> total_blocks_; @@ -803,6 +735,151 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> { DISALLOW_COPY_AND_ASSIGN(LogBlockContainer); }; +//////////////////////////////////////////////////////////// +// LogBlockContainerNativeMeta +//////////////////////////////////////////////////////////// + +// The metadata part is a log-backed file, named "<dir>/<id>.metadata". +class LogBlockContainerNativeMeta final : public LogBlockContainer { + public: + // Creates a new LogBlockContainer managed by 'block_manager' in 'dir', 'container' will be set as + // the newly created container. + // + // Returns Status::OK() if created successfully, otherwise returns an error. + static Status Create(LogBlockManager* block_manager, + Dir* dir, + LogBlockContainerRefPtr* container); + + // Opens an existing LogBlockContainer in 'dir'. + // + // Every container is comprised of two parts: "<dir>/<id>.data" and + // metadata (i.e. "<dir>/<id>.metadata"). Together, 'dir' and 'id' + // fully describe both files. + // + // Returns Status::Aborted() in the case that the metadata and data files + // both appear to have no data (e.g. due to a crash just after creating + // one of them but before writing any records). This is recorded in 'report'. + static Status Open(LogBlockManager* block_manager, + Dir* dir, + FsReport* report, + const string& id, + LogBlockContainerRefPtr* container); + + // Check the container whether it is fine. + // + // OK: both files of the container exist; + // Aborted: the container will be repaired later; + // NotFound: one file of the container has gone missing; + // *-------------*------------*----------------*-------------------------*---------------------* + // |DATA\METADATA| NONE EXIST | EXIST && < MIN | EXIST && NO LIVE BLOCKS | EXIST && LIVE BLOCKS| + // *-------------*------------*----------------*-------------------------*---------------------* + // | NONE EXIST | | Aborted | Aborted | NotFound | + // *-------------*------------*----------------*-------------------------*---------------------* + // | EXIST && 0 | Aborted | Aborted | OK | OK | + // *-------------*------------*----------------*-------------------------*---------------------* + // | EXIST && >0 | NotFound | OK | OK | OK | + // *-------------*------------*----------------*-------------------------*---------------------* + // + // Note: the status here only represents the result of check. + static Status CheckContainerFiles(LogBlockManager* block_manager, + FsReport* report, + const Dir* dir, + const string& common_path, + const string& data_path, + const string& metadata_path); + + LogBlockContainerNativeMeta(LogBlockManager* block_manager, Dir* data_dir, string id, + shared_ptr<RWFile> data_file, uint64_t data_file_size, + unique_ptr<WritablePBContainerFile> metadata_file); + + ~LogBlockContainerNativeMeta() override; + + Status RemoveBlockIdsFromMetadata(const vector<LogBlockRefPtr>& lbs, + vector<BlockId>* deleted_block_ids) override; + + Status AddBlockIdsToMetadata(const vector<LogWritableBlock*>& blocks) override; + + Status SyncMetadata() override; + + // Reopen the metadata files record writer. Should be called if the underlying + // file was changed. + Status ReopenMetadataWriter(); + + Status ProcessRecords( + FsReport* report, + LogBlockManager::UntrackedBlockMap* live_blocks, + LogBlockManager::BlockRecordMap* live_block_records, + vector<LogBlockRefPtr>* dead_blocks, + uint64_t* max_block_id, + ProcessRecordType type) override; + + bool full() const override { + if (LogBlockContainer::full()) { + return true; + } + + if (FLAGS_log_container_metadata_max_size <= 0) { + return false; + } + + // Try lock before reading metadata offset, consider it not full if lock failed. + shared_lock<RWMutex> l(metadata_compact_lock_, std::try_to_lock); + if (!l.owns_lock()) { + return false; + } + return metadata_file_->Offset() >= FLAGS_log_container_metadata_max_size; + } + + void PostWorkOfBlocksDeleted() override; + + private: + // Performs sanity checks on it and removing it from 'live_blocks' and + // 'live_block_records', adds it to 'dead_blocks'. + // + // 'report' is updated with any inconsistencies found in the record. + // + // Note: 'record' may be swapped into 'report'; do not use it after calling + // this function. + void ProcessDeleteRecord( + BlockRecordPB* record, + FsReport* report, + LogBlockManager::UntrackedBlockMap* live_blocks, + LogBlockManager::BlockRecordMap* live_block_records, + vector<LogBlockRefPtr>* dead_blocks, + ProcessRecordType type) override; + + bool ShouldCompactUnlocked() const { + DCHECK_GT(FLAGS_log_container_metadata_max_size, 0); + if (live_blocks() >= + total_blocks() * FLAGS_log_container_live_metadata_before_compact_ratio) { + return false; + } + + return metadata_file_->Offset() >= FLAGS_log_container_metadata_max_size * + FLAGS_log_container_metadata_size_before_compact_ratio; + } + + bool ShouldCompact() const { + shared_lock<RWMutex> l(metadata_compact_lock_); + return ShouldCompactUnlocked(); + } + + void CompactMetadata(); + + // Protect 'metadata_file_', only rewriting should add write lock, + // appending and syncing only need read lock, cause there is an + // internal lock for these operations in WritablePBContainerFile. + mutable RWMutex metadata_compact_lock_; + + // Opened file handles to the container's files. + unique_ptr<WritablePBContainerFile> metadata_file_; + + // TODO(yingchun): add metadata bytes for metadata. + // AtomicInt<int64_t> metadata_bytes_; + + DISALLOW_COPY_AND_ASSIGN(LogBlockContainerNativeMeta); +}; + #define CONTAINER_DISK_FAILURE(status_expr, msg) do { \ Status s_ = (status_expr); \ HANDLE_DISK_FAILURE(s_, block_manager_->error_manager_->RunErrorNotificationCb( \ @@ -813,14 +890,15 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> { LogBlockContainer::LogBlockContainer( LogBlockManager* block_manager, Dir* data_dir, - unique_ptr<WritablePBContainerFile> metadata_file, - shared_ptr<RWFile> data_file) + string id, + shared_ptr<RWFile> data_file, + uint64_t data_file_size) : block_manager_(block_manager), data_dir_(data_dir), + id_(std::move(id)), max_num_blocks_(FindOrDie(block_manager->block_limits_by_data_dir_, data_dir)), - metadata_compact_lock_(RWMutex::Priority::PREFER_READING), - metadata_file_(std::move(metadata_file)), + preallocated_offset_(data_file_size), data_file_(std::move(data_file)), next_block_offset_(0), total_bytes_(0), @@ -842,7 +920,20 @@ LogBlockContainer::LogBlockContainer( } } -LogBlockContainer::~LogBlockContainer() { +LogBlockContainerNativeMeta::LogBlockContainerNativeMeta( + LogBlockManager* block_manager, + Dir* data_dir, + string id, + shared_ptr<RWFile> data_file, + uint64_t data_file_size, + unique_ptr<WritablePBContainerFile> metadata_file) + : LogBlockContainer(block_manager, data_dir, std::move(id), + std::move(data_file), data_file_size), + metadata_compact_lock_(RWMutex::Priority::PREFER_READING), + metadata_file_(std::move(metadata_file)) { +} + +LogBlockContainerNativeMeta::~LogBlockContainerNativeMeta() { if (dead()) { CHECK(!block_manager_->opts_.read_only); string data_file_name = data_file_->filename(); @@ -863,6 +954,8 @@ LogBlockContainer::~LogBlockContainer() { metadata_failure_msg); } } + data_file_.reset(); + metadata_file_.reset(); } void LogBlockContainer::HandleError(const Status& s) const { @@ -871,7 +964,7 @@ void LogBlockContainer::HandleError(const Status& s) const { data_dir_)); } -void LogBlockContainer::CompactMetadata() { +void LogBlockContainerNativeMeta::CompactMetadata() { SCOPED_LOG_SLOW_EXECUTION(WARNING, 5, Substitute("CompactMetadata $0", ToString())); // Skip compacting if lock failed to reduce overhead, metadata is on compacting or will be // compacted next time. @@ -939,9 +1032,12 @@ void LogBlockContainer::CompactMetadata() { block_manager->error_manager()->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, dir)); \ } while (0) -Status LogBlockContainer::Create(LogBlockManager* block_manager, - Dir* dir, - LogBlockContainerRefPtr* container) { +Status LogBlockContainerNativeMeta::Create(LogBlockManager* block_manager, + Dir* dir, + LogBlockContainerRefPtr* container) { + DCHECK(container); + DCHECK(!container->get()); + string id; string common_path; string metadata_path; string data_path; @@ -954,11 +1050,10 @@ Status LogBlockContainer::Create(LogBlockManager* block_manager, // // When looping, we delete any created-and-orphaned files. do { - common_path = JoinPathSegments(dir->dir(), - block_manager->oid_generator()->Next()); + id = block_manager->oid_generator()->Next(); + common_path = JoinPathSegments(dir->dir(), id); metadata_path = StrCat(common_path, LogBlockManager::kContainerMetadataFileSuffix); data_path = StrCat(common_path, LogBlockManager::kContainerDataFileSuffix); - if (PREDICT_TRUE(block_manager->file_cache_)) { if (metadata_writer) { WARN_NOT_OK(block_manager->file_cache_->DeleteFile(metadata_path), @@ -999,31 +1094,35 @@ Status LogBlockContainer::Create(LogBlockManager* block_manager, unique_ptr<WritablePBContainerFile> metadata_file(new WritablePBContainerFile( std::move(metadata_writer))); RETURN_NOT_OK_CONTAINER_DISK_FAILURE(metadata_file->CreateNew(BlockRecordPB())); - container->reset(new LogBlockContainer(block_manager, - dir, - std::move(metadata_file), - std::move(data_file))); + container->reset(new LogBlockContainerNativeMeta(block_manager, + dir, + id, + std::move(data_file), + 0 /*data_file_size*/, + std::move(metadata_file))); VLOG(1) << "Created log block container " << (*container)->ToString(); } // Prefer metadata status (arbitrarily). FsErrorManager* em = block_manager->error_manager(); - HANDLE_DISK_FAILURE(metadata_status, em->RunErrorNotificationCb( - ErrorHandlerType::DISK_ERROR, dir)); + HANDLE_DISK_FAILURE(metadata_status, + em->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, dir)); HANDLE_DISK_FAILURE(data_status, em->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, dir)); return !metadata_status.ok() ? metadata_status : data_status; } -Status LogBlockContainer::Open(LogBlockManager* block_manager, - Dir* dir, - FsReport* report, - const string& id, - LogBlockContainerRefPtr* container) { +Status LogBlockContainerNativeMeta::Open(LogBlockManager* block_manager, + Dir* dir, + FsReport* report, + const string& id, + LogBlockContainerRefPtr* container) { + DCHECK(container); + DCHECK(!container->get()); string common_path = JoinPathSegments(dir->dir(), id); string data_path = StrCat(common_path, LogBlockManager::kContainerDataFileSuffix); string metadata_path = StrCat(common_path, LogBlockManager::kContainerMetadataFileSuffix); - RETURN_NOT_OK(CheckContainerFiles(block_manager, report, dir, - common_path, data_path, metadata_path)); + RETURN_NOT_OK(CheckContainerFiles(block_manager, report, dir, common_path, + data_path, metadata_path)); // Open the existing metadata and data files for writing. shared_ptr<RWFile> metadata_file; @@ -1038,11 +1137,11 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager, opts.mode = Env::MUST_EXIST; opts.is_sensitive = true; unique_ptr<RWFile> rwf; - RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->env()->NewRWFile(opts, - metadata_path, &rwf)); + RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->env()->NewRWFile( + opts, metadata_path, &rwf)); metadata_file.reset(rwf.release()); - RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->env()->NewRWFile(opts, - data_path, &rwf)); + RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->env()->NewRWFile( + opts, data_path, &rwf)); data_file.reset(rwf.release()); } @@ -1054,22 +1153,19 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager, RETURN_NOT_OK_CONTAINER_DISK_FAILURE(data_file->Size(&data_file_size)); // Create the in-memory container and populate it. - LogBlockContainerRefPtr open_container(new LogBlockContainer(block_manager, - dir, - std::move(metadata_pb_writer), - std::move(data_file))); - open_container->preallocated_offset_ = data_file_size; + LogBlockContainerRefPtr open_container(new LogBlockContainerNativeMeta( + block_manager, dir, id, std::move(data_file), data_file_size, std::move(metadata_pb_writer))); VLOG(1) << "Opened log block container " << open_container->ToString(); container->swap(open_container); return Status::OK(); } -Status LogBlockContainer::CheckContainerFiles(LogBlockManager* block_manager, - FsReport* report, - const Dir* dir, - const string& common_path, - const string& data_path, - const string& metadata_path) { +Status LogBlockContainerNativeMeta::CheckContainerFiles(LogBlockManager* block_manager, + FsReport* report, + const Dir* dir, + const string& common_path, + const string& data_path, + const string& metadata_path) { Env* env = block_manager->env(); uint64_t data_size = 0; Status s_data = env->GetFileSize(data_path, &data_size); @@ -1151,7 +1247,7 @@ Status LogBlockContainer::CheckContainerFiles(LogBlockManager* block_manager, Status LogBlockContainer::TruncateDataToNextBlockOffset() { RETURN_NOT_OK_HANDLE_ERROR(read_only_status()); - if (full()) { + if (data_full()) { VLOG(2) << Substitute("Truncating container $0 to offset $1", ToString(), next_block_offset()); RETURN_NOT_OK_HANDLE_ERROR(data_file_->Truncate(next_block_offset())); @@ -1159,7 +1255,7 @@ Status LogBlockContainer::TruncateDataToNextBlockOffset() { return Status::OK(); } -Status LogBlockContainer::ProcessRecords( +Status LogBlockContainerNativeMeta::ProcessRecords( FsReport* report, LogBlockManager::UntrackedBlockMap* live_blocks, LogBlockManager::BlockRecordMap* live_block_records, @@ -1216,14 +1312,12 @@ Status LogBlockContainer::ProcessRecord( uint64_t* data_file_size, uint64_t* max_block_id, ProcessRecordType type) { - const BlockId block_id(BlockId::FromPB(record->block_id())); - LogBlockRefPtr lb; switch (record->op_type()) { - case CREATE: + case CREATE: { // First verify that the record's offset/length aren't wildly incorrect. if (PREDICT_FALSE(!record->has_offset() || !record->has_length() || - record->offset() < 0 || + record->offset() < 0 || record->length() < 0)) { report->malformed_record_check->entries.emplace_back(ToString(), record); break; @@ -1250,7 +1344,8 @@ Status LogBlockContainer::ProcessRecord( break; } - lb = new LogBlock(this, block_id, record->offset(), record->length()); + const BlockId block_id(BlockId::FromPB(record->block_id())); + LogBlockRefPtr lb = new LogBlock(this, block_id, record->offset(), record->length()); if (!InsertIfNotPresent(live_blocks, block_id, lb)) { // We found a record whose ID matches that of an already created block. // @@ -1279,22 +1374,10 @@ Status LogBlockContainer::ProcessRecord( (*live_block_records)[block_id].Swap(record); *max_block_id = std::max(*max_block_id, block_id.id()); break; + } case DELETE: - lb = EraseKeyReturnValuePtr(live_blocks, block_id); - if (!lb) { - // We found a record for which there is no already created block. - // - // TODO(adar): treat as a different kind of inconsistency? - report->malformed_record_check->entries.emplace_back(ToString(), record); - break; - } - VLOG(2) << Substitute("Found DELETE block $0", block_id.ToString()); - if (type == ProcessRecordType::kReadAndUpdate) { - BlockDeleted(lb); - } - - CHECK_EQ(1, live_block_records->erase(block_id)); - dead_blocks->emplace_back(std::move(lb)); + ProcessDeleteRecord(record, report, live_blocks, + live_block_records, dead_blocks, type); break; default: // We found a record with an unknown type. @@ -1306,6 +1389,31 @@ Status LogBlockContainer::ProcessRecord( return Status::OK(); } +void LogBlockContainerNativeMeta::ProcessDeleteRecord( + BlockRecordPB* record, + FsReport* report, + LogBlockManager::UntrackedBlockMap* live_blocks, + LogBlockManager::BlockRecordMap* live_block_records, + vector<LogBlockRefPtr>* dead_blocks, + ProcessRecordType type) { + const BlockId block_id(BlockId::FromPB(record->block_id())); + LogBlockRefPtr lb = EraseKeyReturnValuePtr(live_blocks, block_id); + if (!lb) { + // We found a record for which there is no already created block. + // + // TODO(adar): treat as a different kind of inconsistency? + report->malformed_record_check->entries.emplace_back(ToString(), record); + return; + } + VLOG(2) << Substitute("Found DELETE block $0", block_id.ToString()); + if (type == ProcessRecordType::kReadAndUpdate) { + BlockDeleted(lb); + } + + CHECK_EQ(1, live_block_records->erase(block_id)); + dead_blocks->emplace_back(std::move(lb)); +} + Status LogBlockContainer::DoCloseBlocks(const vector<LogWritableBlock*>& blocks, SyncMode mode) { auto sync_blocks = [&]() { @@ -1316,7 +1424,7 @@ Status LogBlockContainer::DoCloseBlocks(const vector<LogWritableBlock*>& blocks, // Append metadata only after data is synced so that there's // no chance of metadata landing on the disk before the data. - RETURN_NOT_OK_PREPEND(AppendMetadataForCreateRecords(blocks), + RETURN_NOT_OK_PREPEND(AddBlockIdsToMetadata(blocks), "unable to append creation record(s) to block metadata during close"); if (mode == SYNC) { @@ -1408,8 +1516,8 @@ Status LogBlockContainer::SyncData() { return Status::OK(); } -Status LogBlockContainer::AppendMetadataForDeleteRecords(const vector<LogBlockRefPtr>& lbs, - vector<BlockId>* deleted_block_ids) { +Status LogBlockContainerNativeMeta::RemoveBlockIdsFromMetadata( + const vector<LogBlockRefPtr>& lbs, vector<BlockId>* deleted_block_ids) { RETURN_NOT_OK_HANDLE_ERROR(read_only_status()); // Note: We don't check for sufficient disk space for metadata writes in // order to allow for block deletion on full disks. @@ -1441,7 +1549,8 @@ Status LogBlockContainer::AppendMetadataForDeleteRecords(const vector<LogBlockRe return Status::OK(); } -Status LogBlockContainer::AppendMetadataForCreateRecords(const vector<LogWritableBlock*>& blocks) { +Status LogBlockContainerNativeMeta::AddBlockIdsToMetadata( + const vector<LogWritableBlock*>& blocks) { RETURN_NOT_OK_HANDLE_ERROR(read_only_status()); vector<BlockRecordPB> records; records.reserve(blocks.size()); @@ -1463,7 +1572,7 @@ Status LogBlockContainer::AppendMetadataForCreateRecords(const vector<LogWritabl return Status::OK(); } -Status LogBlockContainer::SyncMetadata() { +Status LogBlockContainerNativeMeta::SyncMetadata() { VLOG(3) << "Syncing metadata file " << metadata_file_->filename(); RETURN_NOT_OK_HANDLE_ERROR(read_only_status()); if (FLAGS_enable_data_block_fsync) { @@ -1474,7 +1583,7 @@ Status LogBlockContainer::SyncMetadata() { return Status::OK(); } -Status LogBlockContainer::ReopenMetadataWriter() { +Status LogBlockContainerNativeMeta::ReopenMetadataWriter() { shared_ptr<RWFile> f; if (PREDICT_TRUE(block_manager_->file_cache_)) { RETURN_NOT_OK_HANDLE_ERROR(block_manager_->file_cache_->OpenFile<Env::MUST_EXIST>( @@ -1484,8 +1593,8 @@ Status LogBlockContainer::ReopenMetadataWriter() { RWFileOptions opts; opts.mode = Env::MUST_EXIST; opts.is_sensitive = true; - RETURN_NOT_OK_HANDLE_ERROR(block_manager_->env_->NewRWFile(opts, - metadata_file_->filename(), &f_uniq)); + RETURN_NOT_OK_HANDLE_ERROR( + block_manager_->env_->NewRWFile(opts, metadata_file_->filename(), &f_uniq)); f.reset(f_uniq.release()); } unique_ptr<WritablePBContainerFile> w; @@ -1567,7 +1676,7 @@ void LogBlockContainer::UpdateNextBlockOffset(int64_t block_offset, int64_t bloc instance()->filesystem_block_size_bytes()); next_block_offset_.StoreMax(new_next_block_offset); - if (full()) { + if (data_full()) { VLOG(1) << Substitute( "Container $0 with size $1 is now full, max size is $2", ToString(), next_block_offset(), FLAGS_log_container_max_size); @@ -1846,7 +1955,7 @@ struct LogBlockContainerLoadResult { vector<LogBlockContainerRefPtr> dead_containers; // Keep track of containers whose live block ratio is low; their metadata // files will be compacted during repair. - unordered_map<string, vector<BlockRecordPB>> low_live_block_containers; + ContainerBlocksByName low_live_block_containers; // Keep track of deleted blocks whose space hasn't been punched; they will // be repunched during repair. vector<LogBlockRefPtr> need_repunching_blocks; @@ -2246,12 +2355,12 @@ LogBlockManager::LogBlockManager(Env* env, next_block_id_(1) { managed_block_shards_.resize(kBlockMapChunk); for (auto& mb : managed_block_shards_) { - mb.lock = std::unique_ptr<simple_spinlock>(new simple_spinlock()); + mb.lock = unique_ptr<simple_spinlock>(new simple_spinlock()); mb.blocks_by_block_id - = std::unique_ptr<BlockMap>(new BlockMap(10, - BlockMap::hasher(), - BlockMap::key_equal(), - BlockAllocator(mem_tracker_))); + = unique_ptr<BlockMap>(new BlockMap(10, + BlockMap::hasher(), + BlockMap::key_equal(), + BlockAllocator(mem_tracker_))); mb.blocks_by_block_id->set_deleted_key(BlockId()); } @@ -2308,6 +2417,21 @@ LogBlockManager::~LogBlockManager() { } \ } while (false) +Status LogBlockManagerNativeMeta::CreateContainer(Dir* dir, LogBlockContainerRefPtr* container) { + DCHECK(container); + DCHECK(!container->get()); + return LogBlockContainerNativeMeta::Create(this, dir, container); +} + +Status LogBlockManagerNativeMeta::OpenContainer(Dir* dir, + FsReport* report, + const string& id, + LogBlockContainerRefPtr* container) { + DCHECK(container); + DCHECK(!container->get()); + return LogBlockContainerNativeMeta::Open(this, dir, report, id, container); +} + Status LogBlockManager::Open(FsReport* report, std::atomic<int>* containers_processed, std::atomic<int>* containers_total) { // Establish (and log) block limits for each data directory using kernel, @@ -2491,12 +2615,12 @@ Status LogBlockManager::OpenBlock(const BlockId& block_id, unique_ptr<ReadableBlock>* block) { LogBlockRefPtr lb; { - int index = block_id.id() & kBlockMapMask; + auto index = block_id.id() & kBlockMapMask; std::lock_guard<simple_spinlock> l(*managed_block_shards_[index].lock); lb = FindPtrOrNull(*managed_block_shards_[index].blocks_by_block_id, block_id); } if (!lb) { - return Status::NotFound("Can't find block", block_id.ToString()); + return Status::NotFound("Can't find block when opening log block", block_id.ToString()); } VLOG(3) << "Opened block " << block_id @@ -2586,7 +2710,7 @@ Status LogBlockManager::GetOrCreateContainer(const CreateBlockOptions& opts, // All containers are in use; create a new one. LogBlockContainerRefPtr new_container; - Status s = LogBlockContainer::Create(this, dir, &new_container); + Status s = CreateContainer(dir, &new_container); // We could create a container in a different directory, but there's // currently no point in doing so. On disk failure, the tablet specified by @@ -2649,7 +2773,7 @@ bool LogBlockManager::TryUseBlockId(const BlockId& block_id) { return false; } - int index = block_id.id() & kBlockMapMask; + auto index = block_id.id() & kBlockMapMask; std::lock_guard<simple_spinlock> l(*managed_block_shards_[index].lock); if (ContainsKey(*managed_block_shards_[index].blocks_by_block_id, block_id)) { return false; @@ -2754,7 +2878,7 @@ Status LogBlockManager::RemoveLogBlocks(const vector<BlockId>& block_ids, // // TODO(unknown): what if this fails? Should we restore the in-memory block? vector<BlockId> deleted_block_ids; - Status s = container->AppendMetadataForDeleteRecords(clbs, &deleted_block_ids); + Status s = container->RemoveBlockIdsFromMetadata(clbs, &deleted_block_ids); // We don't bother fsyncing the metadata append for deletes in order to avoid // the disk overhead. Even if we did fsync it, we'd still need to account for @@ -2763,19 +2887,14 @@ Status LogBlockManager::RemoveLogBlocks(const vector<BlockId>& block_ids, // // TODO(KUDU-829): Implement GC of orphaned blocks. // TODO(yingchun): Add some metrics to track the number of orphaned blocks. - if (!s.ok()) { + if (s.ok()) { + container->PostWorkOfBlocksDeleted(); + } else { if (first_failure.ok()) { first_failure = s.CloneAndPrepend("Unable to append deletion record(s) to block metadata"); } // Purge the blocks that failed to delete. clbs.resize(deleted_block_ids.size()); - } else { - // Metadata files of containers with very few live blocks will be compacted. - if (!container->read_only() && FLAGS_log_container_metadata_runtime_compact && - container->ShouldCompact()) { - scoped_refptr<LogBlockContainer> self(container); - container->ExecClosure([self]() { self->CompactMetadata(); }); - } } if (deleted) { @@ -2787,9 +2906,17 @@ Status LogBlockManager::RemoveLogBlocks(const vector<BlockId>& block_ids, return first_failure; } +void LogBlockContainerNativeMeta::PostWorkOfBlocksDeleted() { + // Metadata files of containers with very few live blocks will be compacted. + if (!read_only() && FLAGS_log_container_metadata_runtime_compact && ShouldCompact()) { + scoped_refptr<LogBlockContainerNativeMeta> self(this); + ExecClosure([self]() { self->CompactMetadata(); }); + } +} + Status LogBlockManager::RemoveLogBlock(const BlockId& block_id, LogBlockRefPtr* lb) { - int index = block_id.id() & kBlockMapMask; + auto index = block_id.id() & kBlockMapMask; std::lock_guard<simple_spinlock> l(*managed_block_shards_[index].lock); auto& blocks_by_block_id = managed_block_shards_[index].blocks_by_block_id; @@ -2862,8 +2989,7 @@ void LogBlockManager::OpenDataDir( // Add a new result for the container. results->emplace_back(new internal::LogBlockContainerLoadResult()); LogBlockContainerRefPtr container; - s = LogBlockContainer::Open( - this, dir, &results->back()->report, container_name, &container); + s = OpenContainer(dir, &results->back()->report, container_name, &container); if (containers_processed) { ++*containers_processed; if (metrics_) { @@ -3067,71 +3193,79 @@ void LogBlockManager::RepairTask(Dir* dir, internal::LogBlockContainerLoadResult WARN_NOT_OK(s_, msg); \ } while (0) -Status LogBlockManager::Repair( - Dir* dir, +void LogBlockManager::FindContainersToRepair( FsReport* report, - vector<LogBlockRefPtr> need_repunching, - vector<LogBlockContainerRefPtr> dead_containers, - unordered_map<string, vector<BlockRecordPB>> low_live_block_containers) { - if (opts_.read_only) { - LOG(INFO) << "Read-only block manager, skipping repair"; - return Status::OK(); - } - if (report->HasFatalErrors()) { - LOG(WARNING) << "Found fatal and irreparable errors, skipping repair"; - return Status::OK(); + const ContainerBlocksByName& /*low_live_block_containers*/, + ContainersByName* containers_by_name) { + if (report->partial_record_check) { + for (const auto& pr : report->partial_record_check->entries) { + LogBlockContainerRefPtr c = FindPtrOrNull(all_containers_by_name_, pr.container); + if (c) { + (*containers_by_name)[pr.container] = c; + } + } } - // From here on out we're committed to repairing. - - // Fetch all the containers we're going to need. - unordered_map<string, LogBlockContainerRefPtr> containers_by_name; - { - std::lock_guard<simple_spinlock> l(lock_); - - // Remove all of the dead containers from the block manager. They will be - // deleted from disk shortly thereafter, outside of the lock. - for (const auto& d : dead_containers) { - CHECK(d->TrySetDead()); - RemoveDeadContainerUnlocked(d->ToString()); - - // This must be true if clearing dead_containers below is to delete the - // container files. - DCHECK(d->HasOneRef()); + if (report->full_container_space_check) { + for (const auto& fcp : report->full_container_space_check->entries) { + LogBlockContainerRefPtr c = FindPtrOrNull(all_containers_by_name_, fcp.container); + if (c) { + (*containers_by_name)[fcp.container] = c; + } } + } +} - // Fetch all the containers we're going to need. - if (report->partial_record_check) { - for (const auto& pr : report->partial_record_check->entries) { - LogBlockContainerRefPtr c = FindPtrOrNull(all_containers_by_name_, - pr.container); - if (c) { - containers_by_name[pr.container] = c; - } - } +void LogBlockManagerNativeMeta::FindContainersToRepair( + FsReport* report, + const ContainerBlocksByName& low_live_block_containers, + ContainersByName* containers_by_name) { + LogBlockManager::FindContainersToRepair(report, low_live_block_containers, containers_by_name); + for (const auto& e : low_live_block_containers) { + LogBlockContainerRefPtr c = FindPtrOrNull(all_containers_by_name_, e.first); + if (c) { + (*containers_by_name)[e.first] = c; } - if (report->full_container_space_check) { - for (const auto& fcp : report->full_container_space_check->entries) { - LogBlockContainerRefPtr c = FindPtrOrNull(all_containers_by_name_, - fcp.container); - if (c) { - containers_by_name[fcp.container] = c; - } + } +} + + +Status LogBlockManager::DoRepair( + Dir* /*dir*/, + FsReport* report, + const ContainerBlocksByName& /*low_live_block_containers*/, + const ContainersByName& containers_by_name) { + // Truncate any excess preallocated space in full containers. + // + // This is a non-fatal inconsistency; we can just as easily ignore the extra + // disk space consumption. + if (report->full_container_space_check) { + for (auto& fcp : report->full_container_space_check->entries) { + LogBlockContainerRefPtr container = FindPtrOrNull(containers_by_name, fcp.container); + if (!container) { + // The container was deleted outright. + fcp.repaired = true; + continue; } - } - for (const auto& e : low_live_block_containers) { - LogBlockContainerRefPtr c = FindPtrOrNull(all_containers_by_name_, - e.first); - if (c) { - containers_by_name[e.first] = c; + + Status s = container->TruncateDataToNextBlockOffset(); + if (s.ok()) { + fcp.repaired = true; } + WARN_NOT_OK(s, "could not truncate excess preallocated space"); } } - // Clearing this vector drops the last references to the containers within, - // triggering their deletion. - dead_containers.clear(); + return Status::OK(); +} +Status LogBlockManagerNativeMeta::DoRepair( + Dir* dir, + FsReport* report, + const ContainerBlocksByName& low_live_block_containers, + const ContainersByName& containers_by_name) { + RETURN_NOT_OK(LogBlockManager::DoRepair(dir, report, low_live_block_containers, + containers_by_name)); // Truncate partial metadata records. // // This is a fatal inconsistency; if the repair fails, we cannot proceed. @@ -3155,18 +3289,19 @@ Status LogBlockManager::Repair( "could not reopen container to truncate partial metadata record"); RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(file->Truncate(pr.offset), - "could not truncate partial metadata record"); + "could not truncate partial metadata record"); // Technically we've "repaired" the inconsistency if the truncation // succeeded, even if the following logic fails. pr.repaired = true; - RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(file->Close(), - "could not close container after truncating partial metadata record"); + RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND( + file->Close(), "could not close container after truncating partial metadata record"); // Reopen the PB writer so that it will refresh its metadata about the // underlying file and resume appending to the new end of the file. - RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(container->ReopenMetadataWriter(), + RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND( + down_cast<LogBlockContainerNativeMeta*>(container.get())->ReopenMetadataWriter(), "could not reopen container metadata file"); } } @@ -3181,7 +3316,6 @@ Status LogBlockManager::Repair( if (!s.ok() && !s.IsNotFound()) { WARN_NOT_OK_LBM_DISK_FAILURE(s, "could not delete incomplete container metadata file"); } - s = env_->DeleteFile(StrCat(ic.container, kContainerDataFileSuffix)); if (!s.ok() && !s.IsNotFound()) { WARN_NOT_OK_LBM_DISK_FAILURE(s, "could not delete incomplete container data file"); @@ -3190,42 +3324,6 @@ Status LogBlockManager::Repair( } } - // Truncate any excess preallocated space in full containers. - // - // This is a non-fatal inconsistency; we can just as easily ignore the extra - // disk space consumption. - if (report->full_container_space_check) { - for (auto& fcp : report->full_container_space_check->entries) { - LogBlockContainerRefPtr container = FindPtrOrNull(containers_by_name, fcp.container); - if (!container) { - // The container was deleted outright. - fcp.repaired = true; - continue; - } - - Status s = container->TruncateDataToNextBlockOffset(); - if (s.ok()) { - fcp.repaired = true; - } - WARN_NOT_OK(s, "could not truncate excess preallocated space"); - } - } - - // Repunch all requested holes. Any excess space reclaimed was already - // tracked by LBMFullContainerSpaceCheck. - // - // Register deletions to a single BlockDeletionTransaction. So, the repunched - // holes belonging to the same container can be coalesced. - auto transaction = std::make_shared<LogBlockDeletionTransaction>(this); - for (const auto& b : need_repunching) { - b->RegisterDeletion(transaction); - transaction->AddBlock(b); - } - - // Clearing this vector drops the last references to the LogBlocks within, - // triggering the repunching operations. - need_repunching.clear(); - // "Compact" metadata files with few live blocks by rewriting them with only // the live block records. std::atomic<int64_t> metadata_files_compacted = 0; @@ -3244,7 +3342,7 @@ Status LogBlockManager::Repair( } dir->ExecClosure([this, &metadata_files_compacted, &metadata_bytes_delta, - &seen_fatal_error, &first_fatal_error, e, container]() { + &seen_fatal_error, &first_fatal_error, e, container]() { // Rewrite this metadata file. int64_t file_bytes_delta; const auto& meta_path = StrCat(e.first, kContainerMetadataFileSuffix); @@ -3256,7 +3354,7 @@ Status LogBlockManager::Repair( } // Reopen the new metadata file. - s = container->ReopenMetadataWriter(); + s = down_cast<LogBlockContainerNativeMeta*>(container.get())->ReopenMetadataWriter(); if (!s.ok()) { // Open the new metadata file failure is fatal, stop processing other containers. bool current_seen_error = false; @@ -3299,6 +3397,66 @@ Status LogBlockManager::Repair( return Status::OK(); } +Status LogBlockManager::Repair( + Dir* dir, + FsReport* report, + vector<LogBlockRefPtr> need_repunching, + vector<LogBlockContainerRefPtr> dead_containers, + const ContainerBlocksByName& low_live_block_containers) { + if (opts_.read_only) { + LOG(INFO) << "Read-only block manager, skipping repair"; + return Status::OK(); + } + if (report->HasFatalErrors()) { + LOG(WARNING) << "Found fatal and irreparable errors, skipping repair"; + return Status::OK(); + } + + // From here on out we're committed to repairing. + ContainersByName containers_by_name; + { + std::lock_guard <simple_spinlock> l(lock_); + + // 1. Remove all of the dead containers from the block manager. They will be + // deleted from disk shortly thereafter, outside of the lock. + for (const auto& d : dead_containers) { + CHECK(d->TrySetDead()); + RemoveDeadContainerUnlocked(d->ToString()); + + // This must be true if clearing dead_containers below is to delete the + // container files. + DCHECK(d->HasOneRef()); + } + + // 2. Fetch all the containers we're going to need. + FindContainersToRepair(report, low_live_block_containers, &containers_by_name); + } + + // 3. Clearing this vector drops the last references to the containers within, + // triggering their deletion. + dead_containers.clear(); + + // 4. Repair the containers according to 'report'. + RETURN_NOT_OK(DoRepair(dir, report, low_live_block_containers, containers_by_name)); + + // 5. Repunch all requested holes. Any excess space reclaimed was already + // tracked by LBMFullContainerSpaceCheck. + // + // Register deletions to a single BlockDeletionTransaction. So, the repunched + // holes belonging to the same container can be coalesced. + auto transaction = std::make_shared<LogBlockDeletionTransaction>(this); + for (const auto& b : need_repunching) { + b->RegisterDeletion(transaction); + transaction->AddBlock(b); + } + + // 6. Clearing this vector drops the last references to the LogBlocks within, + // triggering the repunching operations. + need_repunching.clear(); + + return Status::OK(); +} + Status LogBlockManager::RewriteMetadataFile(const LogBlockContainer& container, const vector<BlockRecordPB>& records, int64_t* file_bytes_delta) { diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h index 0e5fffab8..830d1798f 100644 --- a/src/kudu/fs/log_block_manager.h +++ b/src/kudu/fs/log_block_manager.h @@ -57,6 +57,7 @@ struct FsReport; namespace internal { class LogBlock; class LogBlockContainer; +class LogBlockContainerNativeMeta; class LogBlockDeletionTransaction; class LogWritableBlock; struct LogBlockContainerLoadResult; @@ -65,6 +66,9 @@ struct LogBlockManagerMetrics; typedef scoped_refptr<internal::LogBlock> LogBlockRefPtr; typedef scoped_refptr<internal::LogBlockContainer> LogBlockContainerRefPtr; +typedef scoped_refptr<internal::LogBlockContainerNativeMeta> LogBlockContainerNativeMetaRefPtr; +typedef std::unordered_map<std::string, std::vector<BlockRecordPB>> ContainerBlocksByName; +typedef std::unordered_map<std::string, LogBlockContainerRefPtr> ContainersByName; // A log-backed (i.e. sequentially allocated file) block storage // implementation. @@ -205,7 +209,6 @@ class LogBlockManager : public BlockManager { FileCache* file_cache, BlockManagerOptions opts); - private: FRIEND_TEST(LogBlockManagerTest, TestAbortBlock); FRIEND_TEST(LogBlockManagerTest, TestCloseFinalizedBlock); FRIEND_TEST(LogBlockManagerTest, TestCompactFullContainerMetadataAtStartup); @@ -220,6 +223,7 @@ class LogBlockManager : public BlockManager { FRIEND_TEST(LogBlockManagerTest, TestFailMultipleTransactionsPerContainer); friend class internal::LogBlockContainer; + friend class internal::LogBlockContainerNativeMeta; friend class internal::LogBlockDeletionTransaction; friend class internal::LogWritableBlock; friend class LogBlockManagerTest; @@ -253,6 +257,22 @@ class LogBlockManager : public BlockManager { BlockIdHash, BlockIdEqual> BlockRecordMap; + // Creates a new LogBlockContainer in 'dir', 'container' will be set as the newly created + // container only if created successfully. + // + // Returns Status::OK() if created successfully, otherwise returns an error. + virtual Status CreateContainer(Dir* dir, LogBlockContainerRefPtr* container) = 0; + + // Opens an existing LogBlockContainer in 'dir' identified by 'id', 'container' will be set as the + // opened container only if created successfully. + // + // Returns Status::OK() if created successfully, otherwise returns an error, the error is recorded + // in 'report'. + virtual Status OpenContainer(Dir* dir, + FsReport* report, + const std::string& id, + LogBlockContainerRefPtr* container) = 0; + // Adds an as of yet unseen container to this block manager. // // Must be called with 'lock_' held. @@ -340,9 +360,26 @@ class LogBlockManager : public BlockManager { FsReport* report, std::vector<LogBlockRefPtr> need_repunching, std::vector<LogBlockContainerRefPtr> dead_containers, - std::unordered_map< - std::string, - std::vector<BlockRecordPB>> low_live_block_containers); + const ContainerBlocksByName& low_live_block_containers); + + // Fetch all the containers we're going to repair. + // + // The issues to repair are described in 'report' and 'low_live_block_containers', the containers + // are returned by 'containers_by_name'. + virtual void FindContainersToRepair(FsReport* report, + const ContainerBlocksByName& low_live_block_containers, + ContainersByName* containers_by_name); + + // Do the repair work. + // + // The following repairs will be performed: + // 1. Container data files in 'report->full_container_space_check' will be truncated. + // + // Returns an error if repairing a fatal inconsistency failed. + virtual Status DoRepair(Dir* dir, + FsReport* report, + const ContainerBlocksByName& low_live_block_containers, + const ContainersByName& containers_by_name); // Rewrites a container metadata file, adding all entries in 'records'. // The new metadata file is created as a temporary file and renamed over the @@ -498,6 +535,35 @@ class LogBlockManagerNativeMeta : public LogBlockManager { BlockManagerOptions opts) : LogBlockManager(env, dd_manager, error_manager, file_cache, std::move(opts)) { } + + private: + Status CreateContainer(Dir* dir, LogBlockContainerRefPtr* container) override; + + // Returns Status::Aborted() in the case that the metadata and data files + // both appear to have no data (e.g. due to a crash just after creating + // one of them but before writing any records). + Status OpenContainer(Dir* dir, + FsReport* report, + const std::string& id, + LogBlockContainerRefPtr* container) override; + + void FindContainersToRepair(FsReport* report, + const ContainerBlocksByName& low_live_block_containers, + ContainersByName* containers_by_name) override; + + // Do the repair work. + // + // The following repairs will be performed: + // 1. Repairs in LogBlockManager::DoRepair(). + // 2. Container metadata files in 'report->partial_record_check' will be truncated. + // 3. Container data and metadata files in 'report->incomplete_container_check' will be deleted. + // 4. Container metadata files in 'low_live_block_containers' will be compacted. + // + // Returns an error if repairing a fatal inconsistency failed. + Status DoRepair(Dir* dir, + FsReport* report, + const ContainerBlocksByName& low_live_block_containers, + const ContainersByName& containers_by_name) override; }; } // namespace fs
