This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 68797aab6a1740da15a24cb0681fe9be7685b787
Author: Yingchun Lai <[email protected]>
AuthorDate: Sun Mar 12 22:41:48 2023 +0800

    KUDU-3371 [fs] make LogBlockContainer a base class
    
    This patch makes LogBlockContainer a base class for the newly
    added LogBlockContainerNativeMeta.
    
    LogBlockContainerNativeMeta is a log-backed block container
    which contains a sequentially written file to store containers'
    metadata, that's what LogBlockContainer used to be prior to
    this change.
    
    Change-Id: Ieb199de95973aaeba76947ad095907272d84ca67
    Reviewed-on: http://gerrit.cloudera.org:8080/19610
    Tested-by: Kudu Jenkins
    Reviewed-by: Yuqi Du <[email protected]>
---
 src/kudu/fs/log_block_manager.cc | 750 ++++++++++++++++++++++++---------------
 src/kudu/fs/log_block_manager.h  |  74 +++-
 2 files changed, 524 insertions(+), 300 deletions(-)

diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index bc1852d42..cc3ec7427 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -205,6 +205,7 @@ METRIC_DEFINE_gauge_uint64(server, 
log_block_manager_processed_containers_startu
 
 using kudu::fs::internal::LogBlock;
 using kudu::fs::internal::LogBlockContainer;
+using kudu::fs::internal::LogBlockContainerNativeMeta;
 using kudu::fs::internal::LogBlockDeletionTransaction;
 using kudu::fs::internal::LogWritableBlock;
 using kudu::pb_util::ReadablePBContainerFile;
@@ -427,28 +428,8 @@ class LogBlockContainer: public 
RefCountedThreadSafe<LogBlockContainer> {
     NO_SYNC
   };
 
-  // Creates a new block container in 'dir'.
-  static Status Create(LogBlockManager* block_manager,
-                       Dir* dir,
-                       LogBlockContainerRefPtr* container);
-
-  // Opens an existing block container in 'dir'.
-  //
-  // Every container is comprised of two parts: "<dir>/<id>.data" and
-  // metadata (e.g. "<dir>/<id>.metadata"). Together, 'dir' and 'id'
-  // fully describe both files.
-  //
-  // Returns Status::Aborted() in the case that the metadata and data files
-  // both appear to have no data (e.g. due to a crash just after creating
-  // one of them but before writing any records). This is recorded in 'report'.
-  static Status Open(LogBlockManager* block_manager,
-                     Dir* dir,
-                     FsReport* report,
-                     const string& id,
-                     LogBlockContainerRefPtr* container);
-
   // The destructor will delete files of this container if it is dead.
-  ~LogBlockContainer();
+  virtual ~LogBlockContainer() = default;
 
   // Closes a set of blocks belonging to this container, possibly synchronizing
   // the dirty data and metadata to disk.
@@ -458,7 +439,7 @@ class LogBlockContainer: public 
RefCountedThreadSafe<LogBlockContainer> {
 
   // Frees the space associated with a block or a group of blocks at 'offset'
   // and 'length'. This is a physical operation, not a logical one; a separate
-  // AppendMetadataForDeleteRecords() is required to record the deletions in
+  // RemoveBlockIdsFromMetadata() is required to record the deletions in
   // container metadata.
   //
   // The on-disk effects of this call are made durable only after SyncData().
@@ -488,18 +469,18 @@ class LogBlockContainer: public 
RefCountedThreadSafe<LogBlockContainer> {
   // See RWFile::ReadV().
   Status ReadVData(int64_t offset, ArrayView<Slice> results) const;
 
-  // Appends block ids to delete to this container's metadata file according 
to 'lbs',
-  // the block ids deleted successfully are returned by 'deleted_block_ids', 
even if
+  // Removes block ids from this container's metadata part according to 'lbs',
+  // the block ids removed successfully are returned by 'deleted_block_ids', 
even if
   // returning non-OK status.
   //
   // The on-disk effects of this call are made durable only after 
SyncMetadata().
-  Status AppendMetadataForDeleteRecords(const vector<LogBlockRefPtr>& lbs,
-                                        vector<BlockId>* deleted_block_ids);
+  virtual Status RemoveBlockIdsFromMetadata(const vector<LogBlockRefPtr>& lbs,
+                                            vector<BlockId>* 
deleted_block_ids) = 0;
 
-  // Appends 'blocks' to add to this container's metadata file.
+  // Adds 'blocks' to this container's metadata part.
   //
   // The on-disk effects of this call are made durable only after 
SyncMetadata().
-  Status AppendMetadataForCreateRecords(const vector<LogWritableBlock*>& 
blocks);
+  virtual Status AddBlockIdsToMetadata(const vector<LogWritableBlock*>& 
blocks) = 0;
 
   // Asynchronously flush this container's data file from 'offset' through
   // to 'length'.
@@ -513,15 +494,11 @@ class LogBlockContainer: public 
RefCountedThreadSafe<LogBlockContainer> {
   // TODO(unknown): Add support to synchronize just a range.
   Status SyncData();
 
-  // Synchronize this container's metadata file with the disk. On success,
+  // Synchronize this container's metadata part with the disk. On success,
   // guarantees that the metadata is made durable.
   //
   // TODO(unknown): Add support to synchronize just a range.
-  Status SyncMetadata();
-
-  // Reopen the metadata file record writer. Should be called if the underlying
-  // file was changed.
-  Status ReopenMetadataWriter();
+  virtual Status SyncMetadata() = 0;
 
   // Truncates this container's data file to 'next_block_offset_' if it is
   // full. This effectively removes any preallocated but unused space.
@@ -540,6 +517,7 @@ class LogBlockContainer: public 
RefCountedThreadSafe<LogBlockContainer> {
     kReadOnly,       // Read records only.
     kReadAndUpdate,  // Read records and update container's statistic.
   };
+
   // Reads the container's metadata from disk, sanity checking and processing
   // records along the way.
   //
@@ -550,13 +528,13 @@ class LogBlockContainer: public 
RefCountedThreadSafe<LogBlockContainer> {
   //
   // Returns an error only if there was a problem accessing the container from
   // disk; such errors are fatal and effectively halt processing immediately.
-  Status ProcessRecords(
+  virtual Status ProcessRecords(
       FsReport* report,
       LogBlockManager::UntrackedBlockMap* live_blocks,
       LogBlockManager::BlockRecordMap* live_block_records,
       vector<LogBlockRefPtr>* dead_blocks,
       uint64_t* max_block_id,
-      ProcessRecordType type);
+      ProcessRecordType type) = 0;
 
   // Updates internal bookkeeping state to reflect the creation of a block.
   void BlockCreated(const LogBlockRefPtr& block);
@@ -602,6 +580,7 @@ class LogBlockContainer: public 
RefCountedThreadSafe<LogBlockContainer> {
 
   // Simple accessors.
   LogBlockManager* block_manager() const { return block_manager_; }
+  const string& id() const { return id_; }
   int64_t next_block_offset() const { return next_block_offset_.Load(); }
   int64_t total_bytes() const { return total_bytes_.Load(); }
   int64_t total_blocks() const { return total_blocks_.Load(); }
@@ -609,24 +588,7 @@ class LogBlockContainer: public 
RefCountedThreadSafe<LogBlockContainer> {
   int64_t live_bytes_aligned() const { return live_bytes_aligned_.Load(); }
   int64_t live_blocks() const { return live_blocks_.Load(); }
   int32_t blocks_being_written() const { return blocks_being_written_.Load(); }
-  bool full() const {
-    if (next_block_offset() >= FLAGS_log_container_max_size ||
-        (max_num_blocks_ && total_blocks() >= max_num_blocks_)) {
-      return true;
-    }
-
-    if (FLAGS_log_container_metadata_max_size <= 0) {
-      return false;
-    }
-
-    // Try lock before reading metadata offset, consider it not full if lock 
failed.
-    shared_lock<RWMutex> l(metadata_compact_lock_, std::try_to_lock);
-    if (!l.owns_lock()) {
-      return false;
-    }
-    return metadata_file_->Offset() >= FLAGS_log_container_metadata_max_size;
-  }
-
+  virtual bool full() const { return data_full(); }
   bool dead() const { return dead_.Load(); }
   const LogBlockManagerMetrics* metrics() const { return metrics_; }
   Dir* data_dir() const { return data_dir_; }
@@ -663,53 +625,14 @@ class LogBlockContainer: public 
RefCountedThreadSafe<LogBlockContainer> {
     return dead_.CompareAndSet(false, true);
   }
 
-  bool ShouldCompact() const {
-    shared_lock<RWMutex> l(metadata_compact_lock_);
-    return ShouldCompactUnlocked();
-  }
-
-  bool ShouldCompactUnlocked() const {
-    DCHECK_GT(FLAGS_log_container_metadata_max_size, 0);
-    if (live_blocks() >=
-        total_blocks() * 
FLAGS_log_container_live_metadata_before_compact_ratio) {
-      return false;
-    }
-
-    return metadata_file_->Offset() >= FLAGS_log_container_metadata_max_size *
-                                       
FLAGS_log_container_metadata_size_before_compact_ratio;
-  }
-
-  void CompactMetadata();
+  // Some work will be triggered after blocks have been removed from this 
container successfully.
+  virtual void PostWorkOfBlocksDeleted() {}
 
   static std::vector<BlockRecordPB> 
SortRecords(LogBlockManager::BlockRecordMap live_block_records);
 
- private:
-  LogBlockContainer(LogBlockManager* block_manager, Dir* data_dir,
-                    unique_ptr<WritablePBContainerFile> metadata_file,
-                    shared_ptr<RWFile> data_file);
-
-  // Check the container whether it is fine.
-  //
-  // OK: both files of the container exist;
-  // Aborted: the container will be repaired later;
-  // NotFound: one file of the container has gone missing;
-  // 
*-------------*------------*----------------*-------------------------*---------------------*
-  // |DATA\METADATA| NONE EXIST | EXIST && < MIN | EXIST && NO LIVE BLOCKS | 
EXIST && LIVE BLOCKS|
-  // 
*-------------*------------*----------------*-------------------------*---------------------*
-  // | NONE EXIST  |            |    Aborted     |      Aborted            |   
   NotFound       |
-  // 
*-------------*------------*----------------*-------------------------*---------------------*
-  // | EXIST && 0  |  Aborted   |    Aborted     |      OK                 |   
   OK             |
-  // 
*-------------*------------*----------------*-------------------------*---------------------*
-  // | EXIST && >0 |  NotFound  |    OK          |      OK                 |   
   OK             |
-  // 
*-------------*------------*----------------*-------------------------*---------------------*
-  //
-  // Note: the status here only represents the result of check.
-  static Status CheckContainerFiles(LogBlockManager* block_manager,
-                                    FsReport* report,
-                                    const Dir* dir,
-                                    const string& common_path,
-                                    const string& data_path,
-                                    const string& metadata_path);
+ protected:
+  LogBlockContainer(LogBlockManager* block_manager, Dir* data_dir, string id,
+                    shared_ptr<RWFile> data_file, uint64_t data_file_size);
 
   // Processes a single block record, performing sanity checks on it and adding
   // it either to 'live_blocks' or 'dead_blocks'. If the record is live, it is
@@ -735,6 +658,15 @@ class LogBlockContainer: public 
RefCountedThreadSafe<LogBlockContainer> {
       uint64_t* max_block_id,
       ProcessRecordType type);
 
+  // Similar to above, but only processes a single DELETE type block record.
+  virtual void ProcessDeleteRecord(
+      BlockRecordPB* record,
+      FsReport* report,
+      LogBlockManager::UntrackedBlockMap* live_blocks,
+      LogBlockManager::BlockRecordMap* live_block_records,
+      vector<LogBlockRefPtr>* dead_blocks,
+      ProcessRecordType type) = 0;
+
   // Updates this container data file's position based on the offset and length
   // of a block, marking this container as full if needed. Should only be 
called
   // when a block is fully written, or after an encryption header is written, 
as
@@ -743,23 +675,26 @@ class LogBlockContainer: public 
RefCountedThreadSafe<LogBlockContainer> {
   // This function is thread unsafe.
   void UpdateNextBlockOffset(int64_t block_offset, int64_t block_length);
 
+  // Check whether the data file is full.
+  bool data_full() const {
+    return next_block_offset() >= FLAGS_log_container_max_size ||
+           (max_num_blocks_ && total_blocks() >= max_num_blocks_);
+  }
+
   // The owning block manager. Must outlive the container itself.
   LogBlockManager* const block_manager_;
 
   // The data directory where the container lives.
   Dir* data_dir_;
 
+  // The unique id of this container.
+  string id_;
+
   const optional<int64_t> max_num_blocks_;
 
   // Offset up to which we have preallocated bytes.
   int64_t preallocated_offset_ = 0;
 
-  // Protect 'metadata_file_', only rewriting should add write lock,
-  // appending and syncing only need read lock, cause there is an
-  // internal lock for these operations in WritablePBContainerFile.
-  mutable RWMutex metadata_compact_lock_;
-  // Opened file handles to the container's files.
-  unique_ptr<WritablePBContainerFile> metadata_file_;
   shared_ptr<RWFile> data_file_;
 
   // The offset of the next block to be written to the container.
@@ -768,9 +703,6 @@ class LogBlockContainer: public 
RefCountedThreadSafe<LogBlockContainer> {
   // The amount of data (post block alignment) written thus far to the 
container.
   AtomicInt<int64_t> total_bytes_;
 
-  // TODO(yingchun): add metadata bytes for metadata.
-  //AtomicInt<int64_t> metadata_bytes_;
-
   // The number of blocks written thus far in the container.
   AtomicInt<int64_t> total_blocks_;
 
@@ -803,6 +735,151 @@ class LogBlockContainer: public 
RefCountedThreadSafe<LogBlockContainer> {
   DISALLOW_COPY_AND_ASSIGN(LogBlockContainer);
 };
 
+////////////////////////////////////////////////////////////
+// LogBlockContainerNativeMeta
+////////////////////////////////////////////////////////////
+
+// The metadata part is a log-backed file, named "<dir>/<id>.metadata".
+class LogBlockContainerNativeMeta final : public LogBlockContainer {
+ public:
+  // Creates a new LogBlockContainer managed by 'block_manager' in 'dir', 
'container' will be set as
+  // the newly created container.
+  //
+  // Returns Status::OK() if created successfully, otherwise returns an error.
+  static Status Create(LogBlockManager* block_manager,
+                       Dir* dir,
+                       LogBlockContainerRefPtr* container);
+
+  // Opens an existing LogBlockContainer in 'dir'.
+  //
+  // Every container is comprised of two parts: "<dir>/<id>.data" and
+  // metadata (i.e. "<dir>/<id>.metadata"). Together, 'dir' and 'id'
+  // fully describe both files.
+  //
+  // Returns Status::Aborted() in the case that the metadata and data files
+  // both appear to have no data (e.g. due to a crash just after creating
+  // one of them but before writing any records). This is recorded in 'report'.
+  static Status Open(LogBlockManager* block_manager,
+                     Dir* dir,
+                     FsReport* report,
+                     const string& id,
+                     LogBlockContainerRefPtr* container);
+
+  // Check the container whether it is fine.
+  //
+  // OK: both files of the container exist;
+  // Aborted: the container will be repaired later;
+  // NotFound: one file of the container has gone missing;
+  // 
*-------------*------------*----------------*-------------------------*---------------------*
+  // |DATA\METADATA| NONE EXIST | EXIST && < MIN | EXIST && NO LIVE BLOCKS | 
EXIST && LIVE BLOCKS|
+  // 
*-------------*------------*----------------*-------------------------*---------------------*
+  // | NONE EXIST  |            |    Aborted     |      Aborted            |   
   NotFound       |
+  // 
*-------------*------------*----------------*-------------------------*---------------------*
+  // | EXIST && 0  |  Aborted   |    Aborted     |      OK                 |   
   OK             |
+  // 
*-------------*------------*----------------*-------------------------*---------------------*
+  // | EXIST && >0 |  NotFound  |    OK          |      OK                 |   
   OK             |
+  // 
*-------------*------------*----------------*-------------------------*---------------------*
+  //
+  // Note: the status here only represents the result of check.
+  static Status CheckContainerFiles(LogBlockManager* block_manager,
+                                    FsReport* report,
+                                    const Dir* dir,
+                                    const string& common_path,
+                                    const string& data_path,
+                                    const string& metadata_path);
+
+  LogBlockContainerNativeMeta(LogBlockManager* block_manager, Dir* data_dir, 
string id,
+                              shared_ptr<RWFile> data_file, uint64_t 
data_file_size,
+                              unique_ptr<WritablePBContainerFile> 
metadata_file);
+
+  ~LogBlockContainerNativeMeta() override;
+
+  Status RemoveBlockIdsFromMetadata(const vector<LogBlockRefPtr>& lbs,
+                                    vector<BlockId>* deleted_block_ids) 
override;
+
+  Status AddBlockIdsToMetadata(const vector<LogWritableBlock*>& blocks) 
override;
+
+  Status SyncMetadata() override;
+
+  // Reopen the metadata files record writer. Should be called if the 
underlying
+  // file was changed.
+  Status ReopenMetadataWriter();
+
+  Status ProcessRecords(
+      FsReport* report,
+      LogBlockManager::UntrackedBlockMap* live_blocks,
+      LogBlockManager::BlockRecordMap* live_block_records,
+      vector<LogBlockRefPtr>* dead_blocks,
+      uint64_t* max_block_id,
+      ProcessRecordType type) override;
+
+  bool full() const override {
+    if (LogBlockContainer::full()) {
+      return true;
+    }
+
+    if (FLAGS_log_container_metadata_max_size <= 0) {
+      return false;
+    }
+
+    // Try lock before reading metadata offset, consider it not full if lock 
failed.
+    shared_lock<RWMutex> l(metadata_compact_lock_, std::try_to_lock);
+    if (!l.owns_lock()) {
+      return false;
+    }
+    return metadata_file_->Offset() >= FLAGS_log_container_metadata_max_size;
+  }
+
+  void PostWorkOfBlocksDeleted() override;
+
+ private:
+  // Performs sanity checks on it and removing it from 'live_blocks' and
+  // 'live_block_records', adds it to 'dead_blocks'.
+  //
+  // 'report' is updated with any inconsistencies found in the record.
+  //
+  // Note: 'record' may be swapped into 'report'; do not use it after calling
+  // this function.
+  void ProcessDeleteRecord(
+      BlockRecordPB* record,
+      FsReport* report,
+      LogBlockManager::UntrackedBlockMap* live_blocks,
+      LogBlockManager::BlockRecordMap* live_block_records,
+      vector<LogBlockRefPtr>* dead_blocks,
+      ProcessRecordType type) override;
+
+  bool ShouldCompactUnlocked() const {
+    DCHECK_GT(FLAGS_log_container_metadata_max_size, 0);
+    if (live_blocks() >=
+        total_blocks() * 
FLAGS_log_container_live_metadata_before_compact_ratio) {
+      return false;
+    }
+
+    return metadata_file_->Offset() >= FLAGS_log_container_metadata_max_size *
+                                           
FLAGS_log_container_metadata_size_before_compact_ratio;
+  }
+
+  bool ShouldCompact() const {
+    shared_lock<RWMutex> l(metadata_compact_lock_);
+    return ShouldCompactUnlocked();
+  }
+
+  void CompactMetadata();
+
+  // Protect 'metadata_file_', only rewriting should add write lock,
+  // appending and syncing only need read lock, cause there is an
+  // internal lock for these operations in WritablePBContainerFile.
+  mutable RWMutex metadata_compact_lock_;
+
+  // Opened file handles to the container's files.
+  unique_ptr<WritablePBContainerFile> metadata_file_;
+
+  // TODO(yingchun): add metadata bytes for metadata.
+  //  AtomicInt<int64_t> metadata_bytes_;
+
+  DISALLOW_COPY_AND_ASSIGN(LogBlockContainerNativeMeta);
+};
+
 #define CONTAINER_DISK_FAILURE(status_expr, msg) do { \
   Status s_ = (status_expr); \
   HANDLE_DISK_FAILURE(s_, 
block_manager_->error_manager_->RunErrorNotificationCb( \
@@ -813,14 +890,15 @@ class LogBlockContainer: public 
RefCountedThreadSafe<LogBlockContainer> {
 LogBlockContainer::LogBlockContainer(
     LogBlockManager* block_manager,
     Dir* data_dir,
-    unique_ptr<WritablePBContainerFile> metadata_file,
-    shared_ptr<RWFile> data_file)
+    string id,
+    shared_ptr<RWFile> data_file,
+    uint64_t data_file_size)
     : block_manager_(block_manager),
       data_dir_(data_dir),
+      id_(std::move(id)),
       max_num_blocks_(FindOrDie(block_manager->block_limits_by_data_dir_,
                                 data_dir)),
-      metadata_compact_lock_(RWMutex::Priority::PREFER_READING),
-      metadata_file_(std::move(metadata_file)),
+      preallocated_offset_(data_file_size),
       data_file_(std::move(data_file)),
       next_block_offset_(0),
       total_bytes_(0),
@@ -842,7 +920,20 @@ LogBlockContainer::LogBlockContainer(
   }
 }
 
-LogBlockContainer::~LogBlockContainer() {
+LogBlockContainerNativeMeta::LogBlockContainerNativeMeta(
+    LogBlockManager* block_manager,
+    Dir* data_dir,
+    string id,
+    shared_ptr<RWFile> data_file,
+    uint64_t data_file_size,
+    unique_ptr<WritablePBContainerFile> metadata_file)
+    : LogBlockContainer(block_manager, data_dir, std::move(id),
+                        std::move(data_file), data_file_size),
+      metadata_compact_lock_(RWMutex::Priority::PREFER_READING),
+      metadata_file_(std::move(metadata_file)) {
+}
+
+LogBlockContainerNativeMeta::~LogBlockContainerNativeMeta() {
   if (dead()) {
     CHECK(!block_manager_->opts_.read_only);
     string data_file_name = data_file_->filename();
@@ -863,6 +954,8 @@ LogBlockContainer::~LogBlockContainer() {
                              metadata_failure_msg);
     }
   }
+  data_file_.reset();
+  metadata_file_.reset();
 }
 
 void LogBlockContainer::HandleError(const Status& s) const {
@@ -871,7 +964,7 @@ void LogBlockContainer::HandleError(const Status& s) const {
                                                                data_dir_));
 }
 
-void LogBlockContainer::CompactMetadata() {
+void LogBlockContainerNativeMeta::CompactMetadata() {
   SCOPED_LOG_SLOW_EXECUTION(WARNING, 5, Substitute("CompactMetadata $0", 
ToString()));
   // Skip compacting if lock failed to reduce overhead, metadata is on 
compacting or will be
   // compacted next time.
@@ -939,9 +1032,12 @@ void LogBlockContainer::CompactMetadata() {
     
block_manager->error_manager()->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
 dir)); \
 } while (0)
 
-Status LogBlockContainer::Create(LogBlockManager* block_manager,
-                                 Dir* dir,
-                                 LogBlockContainerRefPtr* container) {
+Status LogBlockContainerNativeMeta::Create(LogBlockManager* block_manager,
+                                           Dir* dir,
+                                           LogBlockContainerRefPtr* container) 
{
+  DCHECK(container);
+  DCHECK(!container->get());
+  string id;
   string common_path;
   string metadata_path;
   string data_path;
@@ -954,11 +1050,10 @@ Status LogBlockContainer::Create(LogBlockManager* 
block_manager,
   //
   // When looping, we delete any created-and-orphaned files.
   do {
-    common_path = JoinPathSegments(dir->dir(),
-                                   block_manager->oid_generator()->Next());
+    id = block_manager->oid_generator()->Next();
+    common_path = JoinPathSegments(dir->dir(), id);
     metadata_path = StrCat(common_path, 
LogBlockManager::kContainerMetadataFileSuffix);
     data_path = StrCat(common_path, LogBlockManager::kContainerDataFileSuffix);
-
     if (PREDICT_TRUE(block_manager->file_cache_)) {
       if (metadata_writer) {
         WARN_NOT_OK(block_manager->file_cache_->DeleteFile(metadata_path),
@@ -999,31 +1094,35 @@ Status LogBlockContainer::Create(LogBlockManager* 
block_manager,
     unique_ptr<WritablePBContainerFile> metadata_file(new 
WritablePBContainerFile(
         std::move(metadata_writer)));
     
RETURN_NOT_OK_CONTAINER_DISK_FAILURE(metadata_file->CreateNew(BlockRecordPB()));
-    container->reset(new LogBlockContainer(block_manager,
-                                           dir,
-                                           std::move(metadata_file),
-                                           std::move(data_file)));
+    container->reset(new LogBlockContainerNativeMeta(block_manager,
+                                                     dir,
+                                                     id,
+                                                     std::move(data_file),
+                                                     0 /*data_file_size*/,
+                                                     
std::move(metadata_file)));
     VLOG(1) << "Created log block container " << (*container)->ToString();
   }
 
   // Prefer metadata status (arbitrarily).
   FsErrorManager* em = block_manager->error_manager();
-  HANDLE_DISK_FAILURE(metadata_status, em->RunErrorNotificationCb(
-      ErrorHandlerType::DISK_ERROR, dir));
+  HANDLE_DISK_FAILURE(metadata_status,
+                      em->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, 
dir));
   HANDLE_DISK_FAILURE(data_status, 
em->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, dir));
   return !metadata_status.ok() ? metadata_status : data_status;
 }
 
-Status LogBlockContainer::Open(LogBlockManager* block_manager,
-                               Dir* dir,
-                               FsReport* report,
-                               const string& id,
-                               LogBlockContainerRefPtr* container) {
+Status LogBlockContainerNativeMeta::Open(LogBlockManager* block_manager,
+                                         Dir* dir,
+                                         FsReport* report,
+                                         const string& id,
+                                         LogBlockContainerRefPtr* container) {
+  DCHECK(container);
+  DCHECK(!container->get());
   string common_path = JoinPathSegments(dir->dir(), id);
   string data_path = StrCat(common_path, 
LogBlockManager::kContainerDataFileSuffix);
   string metadata_path = StrCat(common_path, 
LogBlockManager::kContainerMetadataFileSuffix);
-  RETURN_NOT_OK(CheckContainerFiles(block_manager, report, dir,
-                                    common_path, data_path, metadata_path));
+  RETURN_NOT_OK(CheckContainerFiles(block_manager, report, dir, common_path,
+                                    data_path, metadata_path));
 
   // Open the existing metadata and data files for writing.
   shared_ptr<RWFile> metadata_file;
@@ -1038,11 +1137,11 @@ Status LogBlockContainer::Open(LogBlockManager* 
block_manager,
     opts.mode = Env::MUST_EXIST;
     opts.is_sensitive = true;
     unique_ptr<RWFile> rwf;
-    RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->env()->NewRWFile(opts,
-        metadata_path, &rwf));
+    RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->env()->NewRWFile(
+        opts, metadata_path, &rwf));
     metadata_file.reset(rwf.release());
-    RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->env()->NewRWFile(opts,
-        data_path, &rwf));
+    RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->env()->NewRWFile(
+        opts, data_path, &rwf));
     data_file.reset(rwf.release());
   }
 
@@ -1054,22 +1153,19 @@ Status LogBlockContainer::Open(LogBlockManager* 
block_manager,
   RETURN_NOT_OK_CONTAINER_DISK_FAILURE(data_file->Size(&data_file_size));
 
   // Create the in-memory container and populate it.
-  LogBlockContainerRefPtr open_container(new LogBlockContainer(block_manager,
-                                                               dir,
-                                                               
std::move(metadata_pb_writer),
-                                                               
std::move(data_file)));
-  open_container->preallocated_offset_ = data_file_size;
+  LogBlockContainerRefPtr open_container(new LogBlockContainerNativeMeta(
+      block_manager, dir, id, std::move(data_file), data_file_size, 
std::move(metadata_pb_writer)));
   VLOG(1) << "Opened log block container " << open_container->ToString();
   container->swap(open_container);
   return Status::OK();
 }
 
-Status LogBlockContainer::CheckContainerFiles(LogBlockManager* block_manager,
-                                              FsReport* report,
-                                              const Dir* dir,
-                                              const string& common_path,
-                                              const string& data_path,
-                                              const string& metadata_path) {
+Status LogBlockContainerNativeMeta::CheckContainerFiles(LogBlockManager* 
block_manager,
+                                                        FsReport* report,
+                                                        const Dir* dir,
+                                                        const string& 
common_path,
+                                                        const string& 
data_path,
+                                                        const string& 
metadata_path) {
   Env* env = block_manager->env();
   uint64_t data_size = 0;
   Status s_data = env->GetFileSize(data_path, &data_size);
@@ -1151,7 +1247,7 @@ Status 
LogBlockContainer::CheckContainerFiles(LogBlockManager* block_manager,
 Status LogBlockContainer::TruncateDataToNextBlockOffset() {
   RETURN_NOT_OK_HANDLE_ERROR(read_only_status());
 
-  if (full()) {
+  if (data_full()) {
     VLOG(2) << Substitute("Truncating container $0 to offset $1",
                           ToString(), next_block_offset());
     RETURN_NOT_OK_HANDLE_ERROR(data_file_->Truncate(next_block_offset()));
@@ -1159,7 +1255,7 @@ Status LogBlockContainer::TruncateDataToNextBlockOffset() 
{
   return Status::OK();
 }
 
-Status LogBlockContainer::ProcessRecords(
+Status LogBlockContainerNativeMeta::ProcessRecords(
     FsReport* report,
     LogBlockManager::UntrackedBlockMap* live_blocks,
     LogBlockManager::BlockRecordMap* live_block_records,
@@ -1216,14 +1312,12 @@ Status LogBlockContainer::ProcessRecord(
     uint64_t* data_file_size,
     uint64_t* max_block_id,
     ProcessRecordType type) {
-  const BlockId block_id(BlockId::FromPB(record->block_id()));
-  LogBlockRefPtr lb;
   switch (record->op_type()) {
-    case CREATE:
+    case CREATE: {
       // First verify that the record's offset/length aren't wildly incorrect.
       if (PREDICT_FALSE(!record->has_offset() ||
                         !record->has_length() ||
-                        record->offset() < 0  ||
+                        record->offset() < 0 ||
                         record->length() < 0)) {
         report->malformed_record_check->entries.emplace_back(ToString(), 
record);
         break;
@@ -1250,7 +1344,8 @@ Status LogBlockContainer::ProcessRecord(
         break;
       }
 
-      lb = new LogBlock(this, block_id, record->offset(), record->length());
+      const BlockId block_id(BlockId::FromPB(record->block_id()));
+      LogBlockRefPtr lb = new LogBlock(this, block_id, record->offset(), 
record->length());
       if (!InsertIfNotPresent(live_blocks, block_id, lb)) {
         // We found a record whose ID matches that of an already created block.
         //
@@ -1279,22 +1374,10 @@ Status LogBlockContainer::ProcessRecord(
       (*live_block_records)[block_id].Swap(record);
       *max_block_id = std::max(*max_block_id, block_id.id());
       break;
+    }
     case DELETE:
-      lb = EraseKeyReturnValuePtr(live_blocks, block_id);
-      if (!lb) {
-        // We found a record for which there is no already created block.
-        //
-        // TODO(adar): treat as a different kind of inconsistency?
-        report->malformed_record_check->entries.emplace_back(ToString(), 
record);
-        break;
-      }
-      VLOG(2) << Substitute("Found DELETE block $0", block_id.ToString());
-      if (type == ProcessRecordType::kReadAndUpdate) {
-        BlockDeleted(lb);
-      }
-
-      CHECK_EQ(1, live_block_records->erase(block_id));
-      dead_blocks->emplace_back(std::move(lb));
+      ProcessDeleteRecord(record, report, live_blocks,
+                          live_block_records, dead_blocks, type);
       break;
     default:
       // We found a record with an unknown type.
@@ -1306,6 +1389,31 @@ Status LogBlockContainer::ProcessRecord(
   return Status::OK();
 }
 
+void LogBlockContainerNativeMeta::ProcessDeleteRecord(
+        BlockRecordPB* record,
+        FsReport* report,
+        LogBlockManager::UntrackedBlockMap* live_blocks,
+        LogBlockManager::BlockRecordMap* live_block_records,
+        vector<LogBlockRefPtr>* dead_blocks,
+        ProcessRecordType type) {
+  const BlockId block_id(BlockId::FromPB(record->block_id()));
+  LogBlockRefPtr lb = EraseKeyReturnValuePtr(live_blocks, block_id);
+  if (!lb) {
+    // We found a record for which there is no already created block.
+    //
+    // TODO(adar): treat as a different kind of inconsistency?
+    report->malformed_record_check->entries.emplace_back(ToString(), record);
+    return;
+  }
+  VLOG(2) << Substitute("Found DELETE block $0", block_id.ToString());
+  if (type == ProcessRecordType::kReadAndUpdate) {
+    BlockDeleted(lb);
+  }
+
+  CHECK_EQ(1, live_block_records->erase(block_id));
+  dead_blocks->emplace_back(std::move(lb));
+}
+
 Status LogBlockContainer::DoCloseBlocks(const vector<LogWritableBlock*>& 
blocks,
                                         SyncMode mode) {
   auto sync_blocks = [&]() {
@@ -1316,7 +1424,7 @@ Status LogBlockContainer::DoCloseBlocks(const 
vector<LogWritableBlock*>& blocks,
 
     // Append metadata only after data is synced so that there's
     // no chance of metadata landing on the disk before the data.
-    RETURN_NOT_OK_PREPEND(AppendMetadataForCreateRecords(blocks),
+    RETURN_NOT_OK_PREPEND(AddBlockIdsToMetadata(blocks),
                           "unable to append creation record(s) to block 
metadata during close");
 
     if (mode == SYNC) {
@@ -1408,8 +1516,8 @@ Status LogBlockContainer::SyncData() {
   return Status::OK();
 }
 
-Status LogBlockContainer::AppendMetadataForDeleteRecords(const 
vector<LogBlockRefPtr>& lbs,
-                                                         vector<BlockId>* 
deleted_block_ids) {
+Status LogBlockContainerNativeMeta::RemoveBlockIdsFromMetadata(
+    const vector<LogBlockRefPtr>& lbs, vector<BlockId>* deleted_block_ids) {
   RETURN_NOT_OK_HANDLE_ERROR(read_only_status());
   // Note: We don't check for sufficient disk space for metadata writes in
   // order to allow for block deletion on full disks.
@@ -1441,7 +1549,8 @@ Status 
LogBlockContainer::AppendMetadataForDeleteRecords(const vector<LogBlockRe
   return Status::OK();
 }
 
-Status LogBlockContainer::AppendMetadataForCreateRecords(const 
vector<LogWritableBlock*>& blocks) {
+Status LogBlockContainerNativeMeta::AddBlockIdsToMetadata(
+    const vector<LogWritableBlock*>& blocks) {
   RETURN_NOT_OK_HANDLE_ERROR(read_only_status());
   vector<BlockRecordPB> records;
   records.reserve(blocks.size());
@@ -1463,7 +1572,7 @@ Status 
LogBlockContainer::AppendMetadataForCreateRecords(const vector<LogWritabl
   return Status::OK();
 }
 
-Status LogBlockContainer::SyncMetadata() {
+Status LogBlockContainerNativeMeta::SyncMetadata() {
   VLOG(3) << "Syncing metadata file " << metadata_file_->filename();
   RETURN_NOT_OK_HANDLE_ERROR(read_only_status());
   if (FLAGS_enable_data_block_fsync) {
@@ -1474,7 +1583,7 @@ Status LogBlockContainer::SyncMetadata() {
   return Status::OK();
 }
 
-Status LogBlockContainer::ReopenMetadataWriter() {
+Status LogBlockContainerNativeMeta::ReopenMetadataWriter() {
   shared_ptr<RWFile> f;
   if (PREDICT_TRUE(block_manager_->file_cache_)) {
     
RETURN_NOT_OK_HANDLE_ERROR(block_manager_->file_cache_->OpenFile<Env::MUST_EXIST>(
@@ -1484,8 +1593,8 @@ Status LogBlockContainer::ReopenMetadataWriter() {
     RWFileOptions opts;
     opts.mode = Env::MUST_EXIST;
     opts.is_sensitive = true;
-    RETURN_NOT_OK_HANDLE_ERROR(block_manager_->env_->NewRWFile(opts,
-        metadata_file_->filename(), &f_uniq));
+    RETURN_NOT_OK_HANDLE_ERROR(
+        block_manager_->env_->NewRWFile(opts, metadata_file_->filename(), 
&f_uniq));
     f.reset(f_uniq.release());
   }
   unique_ptr<WritablePBContainerFile> w;
@@ -1567,7 +1676,7 @@ void LogBlockContainer::UpdateNextBlockOffset(int64_t 
block_offset, int64_t bloc
       instance()->filesystem_block_size_bytes());
   next_block_offset_.StoreMax(new_next_block_offset);
 
-  if (full()) {
+  if (data_full()) {
     VLOG(1) << Substitute(
         "Container $0 with size $1 is now full, max size is $2",
         ToString(), next_block_offset(), FLAGS_log_container_max_size);
@@ -1846,7 +1955,7 @@ struct LogBlockContainerLoadResult {
   vector<LogBlockContainerRefPtr> dead_containers;
   // Keep track of containers whose live block ratio is low; their metadata
   // files will be compacted during repair.
-  unordered_map<string, vector<BlockRecordPB>> low_live_block_containers;
+  ContainerBlocksByName low_live_block_containers;
   // Keep track of deleted blocks whose space hasn't been punched; they will
   // be repunched during repair.
   vector<LogBlockRefPtr> need_repunching_blocks;
@@ -2246,12 +2355,12 @@ LogBlockManager::LogBlockManager(Env* env,
     next_block_id_(1) {
   managed_block_shards_.resize(kBlockMapChunk);
   for (auto& mb : managed_block_shards_) {
-    mb.lock = std::unique_ptr<simple_spinlock>(new simple_spinlock());
+    mb.lock = unique_ptr<simple_spinlock>(new simple_spinlock());
     mb.blocks_by_block_id
-        = std::unique_ptr<BlockMap>(new BlockMap(10,
-                                                 BlockMap::hasher(),
-                                                 BlockMap::key_equal(),
-                                                 
BlockAllocator(mem_tracker_)));
+        = unique_ptr<BlockMap>(new BlockMap(10,
+                                            BlockMap::hasher(),
+                                            BlockMap::key_equal(),
+                                            BlockAllocator(mem_tracker_)));
     mb.blocks_by_block_id->set_deleted_key(BlockId());
   }
 
@@ -2308,6 +2417,21 @@ LogBlockManager::~LogBlockManager() {
     }                                                           \
   } while (false)
 
+Status LogBlockManagerNativeMeta::CreateContainer(Dir* dir, 
LogBlockContainerRefPtr* container) {
+  DCHECK(container);
+  DCHECK(!container->get());
+  return LogBlockContainerNativeMeta::Create(this, dir, container);
+}
+
+Status LogBlockManagerNativeMeta::OpenContainer(Dir* dir,
+                                                FsReport* report,
+                                                const string& id,
+                                                LogBlockContainerRefPtr* 
container) {
+  DCHECK(container);
+  DCHECK(!container->get());
+  return LogBlockContainerNativeMeta::Open(this, dir, report, id, container);
+}
+
 Status LogBlockManager::Open(FsReport* report, std::atomic<int>* 
containers_processed,
                              std::atomic<int>* containers_total) {
   // Establish (and log) block limits for each data directory using kernel,
@@ -2491,12 +2615,12 @@ Status LogBlockManager::OpenBlock(const BlockId& 
block_id,
                                   unique_ptr<ReadableBlock>* block) {
   LogBlockRefPtr lb;
   {
-    int index = block_id.id() & kBlockMapMask;
+    auto index = block_id.id() & kBlockMapMask;
     std::lock_guard<simple_spinlock> l(*managed_block_shards_[index].lock);
     lb = FindPtrOrNull(*managed_block_shards_[index].blocks_by_block_id, 
block_id);
   }
   if (!lb) {
-    return Status::NotFound("Can't find block", block_id.ToString());
+    return Status::NotFound("Can't find block when opening log block", 
block_id.ToString());
   }
 
   VLOG(3) << "Opened block " << block_id
@@ -2586,7 +2710,7 @@ Status LogBlockManager::GetOrCreateContainer(const 
CreateBlockOptions& opts,
 
   // All containers are in use; create a new one.
   LogBlockContainerRefPtr new_container;
-  Status s = LogBlockContainer::Create(this, dir, &new_container);
+  Status s = CreateContainer(dir, &new_container);
 
   // We could create a container in a different directory, but there's
   // currently no point in doing so. On disk failure, the tablet specified by
@@ -2649,7 +2773,7 @@ bool LogBlockManager::TryUseBlockId(const BlockId& 
block_id) {
     return false;
   }
 
-  int index = block_id.id() & kBlockMapMask;
+  auto index = block_id.id() & kBlockMapMask;
   std::lock_guard<simple_spinlock> l(*managed_block_shards_[index].lock);
   if (ContainsKey(*managed_block_shards_[index].blocks_by_block_id, block_id)) 
{
     return false;
@@ -2754,7 +2878,7 @@ Status LogBlockManager::RemoveLogBlocks(const 
vector<BlockId>& block_ids,
     //
     // TODO(unknown): what if this fails? Should we restore the in-memory 
block?
     vector<BlockId> deleted_block_ids;
-    Status s = container->AppendMetadataForDeleteRecords(clbs, 
&deleted_block_ids);
+    Status s = container->RemoveBlockIdsFromMetadata(clbs, &deleted_block_ids);
 
     // We don't bother fsyncing the metadata append for deletes in order to 
avoid
     // the disk overhead. Even if we did fsync it, we'd still need to account 
for
@@ -2763,19 +2887,14 @@ Status LogBlockManager::RemoveLogBlocks(const 
vector<BlockId>& block_ids,
     //
     // TODO(KUDU-829): Implement GC of orphaned blocks.
     // TODO(yingchun): Add some metrics to track the number of orphaned blocks.
-    if (!s.ok()) {
+    if (s.ok()) {
+      container->PostWorkOfBlocksDeleted();
+    } else {
       if (first_failure.ok()) {
         first_failure = s.CloneAndPrepend("Unable to append deletion record(s) 
to block metadata");
       }
       // Purge the blocks that failed to delete.
       clbs.resize(deleted_block_ids.size());
-    } else {
-      // Metadata files of containers with very few live blocks will be 
compacted.
-      if (!container->read_only() && 
FLAGS_log_container_metadata_runtime_compact &&
-          container->ShouldCompact()) {
-        scoped_refptr<LogBlockContainer> self(container);
-        container->ExecClosure([self]() { self->CompactMetadata(); });
-      }
     }
 
     if (deleted) {
@@ -2787,9 +2906,17 @@ Status LogBlockManager::RemoveLogBlocks(const 
vector<BlockId>& block_ids,
   return first_failure;
 }
 
+void LogBlockContainerNativeMeta::PostWorkOfBlocksDeleted() {
+  // Metadata files of containers with very few live blocks will be compacted.
+  if (!read_only() && FLAGS_log_container_metadata_runtime_compact && 
ShouldCompact()) {
+    scoped_refptr<LogBlockContainerNativeMeta> self(this);
+    ExecClosure([self]() { self->CompactMetadata(); });
+  }
+}
+
 Status LogBlockManager::RemoveLogBlock(const BlockId& block_id,
                                        LogBlockRefPtr* lb) {
-  int index = block_id.id() & kBlockMapMask;
+  auto index = block_id.id() & kBlockMapMask;
   std::lock_guard<simple_spinlock> l(*managed_block_shards_[index].lock);
   auto& blocks_by_block_id = managed_block_shards_[index].blocks_by_block_id;
 
@@ -2862,8 +2989,7 @@ void LogBlockManager::OpenDataDir(
     // Add a new result for the container.
     results->emplace_back(new internal::LogBlockContainerLoadResult());
     LogBlockContainerRefPtr container;
-    s = LogBlockContainer::Open(
-        this, dir, &results->back()->report, container_name, &container);
+    s = OpenContainer(dir, &results->back()->report, container_name, 
&container);
     if (containers_processed) {
       ++*containers_processed;
       if (metrics_) {
@@ -3067,71 +3193,79 @@ void LogBlockManager::RepairTask(Dir* dir, 
internal::LogBlockContainerLoadResult
   WARN_NOT_OK(s_, msg); \
 } while (0)
 
-Status LogBlockManager::Repair(
-    Dir* dir,
+void LogBlockManager::FindContainersToRepair(
     FsReport* report,
-    vector<LogBlockRefPtr> need_repunching,
-    vector<LogBlockContainerRefPtr> dead_containers,
-    unordered_map<string, vector<BlockRecordPB>> low_live_block_containers) {
-  if (opts_.read_only) {
-    LOG(INFO) << "Read-only block manager, skipping repair";
-    return Status::OK();
-  }
-  if (report->HasFatalErrors()) {
-    LOG(WARNING) << "Found fatal and irreparable errors, skipping repair";
-    return Status::OK();
+    const ContainerBlocksByName& /*low_live_block_containers*/,
+    ContainersByName* containers_by_name) {
+  if (report->partial_record_check) {
+    for (const auto& pr : report->partial_record_check->entries) {
+      LogBlockContainerRefPtr c = FindPtrOrNull(all_containers_by_name_, 
pr.container);
+      if (c) {
+        (*containers_by_name)[pr.container] = c;
+      }
+    }
   }
 
-  // From here on out we're committed to repairing.
-
-  // Fetch all the containers we're going to need.
-  unordered_map<string, LogBlockContainerRefPtr> containers_by_name;
-  {
-    std::lock_guard<simple_spinlock> l(lock_);
-
-    // Remove all of the dead containers from the block manager. They will be
-    // deleted from disk shortly thereafter, outside of the lock.
-    for (const auto& d : dead_containers) {
-      CHECK(d->TrySetDead());
-      RemoveDeadContainerUnlocked(d->ToString());
-
-      // This must be true if clearing dead_containers below is to delete the
-      // container files.
-      DCHECK(d->HasOneRef());
+  if (report->full_container_space_check) {
+    for (const auto& fcp : report->full_container_space_check->entries) {
+      LogBlockContainerRefPtr c = FindPtrOrNull(all_containers_by_name_, 
fcp.container);
+      if (c) {
+        (*containers_by_name)[fcp.container] = c;
+      }
     }
+  }
+}
 
-    // Fetch all the containers we're going to need.
-    if (report->partial_record_check) {
-      for (const auto& pr : report->partial_record_check->entries) {
-        LogBlockContainerRefPtr c = FindPtrOrNull(all_containers_by_name_,
-                                                  pr.container);
-        if (c) {
-          containers_by_name[pr.container] = c;
-        }
-      }
+void LogBlockManagerNativeMeta::FindContainersToRepair(
+    FsReport* report,
+    const ContainerBlocksByName& low_live_block_containers,
+    ContainersByName* containers_by_name) {
+  LogBlockManager::FindContainersToRepair(report, low_live_block_containers, 
containers_by_name);
+  for (const auto& e : low_live_block_containers) {
+    LogBlockContainerRefPtr c = FindPtrOrNull(all_containers_by_name_, 
e.first);
+    if (c) {
+      (*containers_by_name)[e.first] = c;
     }
-    if (report->full_container_space_check) {
-      for (const auto& fcp : report->full_container_space_check->entries) {
-        LogBlockContainerRefPtr c = FindPtrOrNull(all_containers_by_name_,
-                                                  fcp.container);
-        if (c) {
-          containers_by_name[fcp.container] = c;
-        }
+  }
+}
+
+
+Status LogBlockManager::DoRepair(
+    Dir* /*dir*/,
+    FsReport* report,
+    const ContainerBlocksByName& /*low_live_block_containers*/,
+    const ContainersByName& containers_by_name) {
+  // Truncate any excess preallocated space in full containers.
+  //
+  // This is a non-fatal inconsistency; we can just as easily ignore the extra
+  // disk space consumption.
+  if (report->full_container_space_check) {
+    for (auto& fcp : report->full_container_space_check->entries) {
+      LogBlockContainerRefPtr container = FindPtrOrNull(containers_by_name, 
fcp.container);
+      if (!container) {
+        // The container was deleted outright.
+        fcp.repaired = true;
+        continue;
       }
-    }
-    for (const auto& e : low_live_block_containers) {
-      LogBlockContainerRefPtr c = FindPtrOrNull(all_containers_by_name_,
-                                                e.first);
-      if (c) {
-        containers_by_name[e.first] = c;
+
+      Status s = container->TruncateDataToNextBlockOffset();
+      if (s.ok()) {
+        fcp.repaired = true;
       }
+      WARN_NOT_OK(s, "could not truncate excess preallocated space");
     }
   }
 
-  // Clearing this vector drops the last references to the containers within,
-  // triggering their deletion.
-  dead_containers.clear();
+  return Status::OK();
+}
 
+Status LogBlockManagerNativeMeta::DoRepair(
+    Dir* dir,
+    FsReport* report,
+    const ContainerBlocksByName& low_live_block_containers,
+    const ContainersByName& containers_by_name) {
+  RETURN_NOT_OK(LogBlockManager::DoRepair(dir, report, 
low_live_block_containers,
+                                          containers_by_name));
   // Truncate partial metadata records.
   //
   // This is a fatal inconsistency; if the repair fails, we cannot proceed.
@@ -3155,18 +3289,19 @@ Status LogBlockManager::Repair(
           "could not reopen container to truncate partial metadata record");
 
       RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(file->Truncate(pr.offset),
-          "could not truncate partial metadata record");
+                                             "could not truncate partial 
metadata record");
 
       // Technically we've "repaired" the inconsistency if the truncation
       // succeeded, even if the following logic fails.
       pr.repaired = true;
 
-      RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(file->Close(),
-          "could not close container after truncating partial metadata 
record");
+      RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(
+          file->Close(), "could not close container after truncating partial 
metadata record");
 
       // Reopen the PB writer so that it will refresh its metadata about the
       // underlying file and resume appending to the new end of the file.
-      RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(container->ReopenMetadataWriter(),
+      RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(
+          
down_cast<LogBlockContainerNativeMeta*>(container.get())->ReopenMetadataWriter(),
           "could not reopen container metadata file");
     }
   }
@@ -3181,7 +3316,6 @@ Status LogBlockManager::Repair(
       if (!s.ok() && !s.IsNotFound()) {
         WARN_NOT_OK_LBM_DISK_FAILURE(s, "could not delete incomplete container 
metadata file");
       }
-
       s = env_->DeleteFile(StrCat(ic.container, kContainerDataFileSuffix));
       if (!s.ok() && !s.IsNotFound()) {
         WARN_NOT_OK_LBM_DISK_FAILURE(s, "could not delete incomplete container 
data file");
@@ -3190,42 +3324,6 @@ Status LogBlockManager::Repair(
     }
   }
 
-  // Truncate any excess preallocated space in full containers.
-  //
-  // This is a non-fatal inconsistency; we can just as easily ignore the extra
-  // disk space consumption.
-  if (report->full_container_space_check) {
-    for (auto& fcp : report->full_container_space_check->entries) {
-      LogBlockContainerRefPtr container = FindPtrOrNull(containers_by_name, 
fcp.container);
-      if (!container) {
-        // The container was deleted outright.
-        fcp.repaired = true;
-        continue;
-      }
-
-      Status s = container->TruncateDataToNextBlockOffset();
-      if (s.ok()) {
-        fcp.repaired = true;
-      }
-      WARN_NOT_OK(s, "could not truncate excess preallocated space");
-    }
-  }
-
-  // Repunch all requested holes. Any excess space reclaimed was already
-  // tracked by LBMFullContainerSpaceCheck.
-  //
-  // Register deletions to a single BlockDeletionTransaction. So, the repunched
-  // holes belonging to the same container can be coalesced.
-  auto transaction = std::make_shared<LogBlockDeletionTransaction>(this);
-  for (const auto& b : need_repunching) {
-    b->RegisterDeletion(transaction);
-    transaction->AddBlock(b);
-  }
-
-  // Clearing this vector drops the last references to the LogBlocks within,
-  // triggering the repunching operations.
-  need_repunching.clear();
-
   // "Compact" metadata files with few live blocks by rewriting them with only
   // the live block records.
   std::atomic<int64_t> metadata_files_compacted = 0;
@@ -3244,7 +3342,7 @@ Status LogBlockManager::Repair(
     }
 
     dir->ExecClosure([this, &metadata_files_compacted, &metadata_bytes_delta,
-        &seen_fatal_error, &first_fatal_error, e, container]() {
+                         &seen_fatal_error, &first_fatal_error, e, 
container]() {
       // Rewrite this metadata file.
       int64_t file_bytes_delta;
       const auto& meta_path = StrCat(e.first, kContainerMetadataFileSuffix);
@@ -3256,7 +3354,7 @@ Status LogBlockManager::Repair(
       }
 
       // Reopen the new metadata file.
-      s = container->ReopenMetadataWriter();
+      s = 
down_cast<LogBlockContainerNativeMeta*>(container.get())->ReopenMetadataWriter();
       if (!s.ok()) {
         // Open the new metadata file failure is fatal, stop processing other 
containers.
         bool current_seen_error = false;
@@ -3299,6 +3397,66 @@ Status LogBlockManager::Repair(
   return Status::OK();
 }
 
+Status LogBlockManager::Repair(
+    Dir* dir,
+    FsReport* report,
+    vector<LogBlockRefPtr> need_repunching,
+    vector<LogBlockContainerRefPtr> dead_containers,
+    const ContainerBlocksByName& low_live_block_containers) {
+  if (opts_.read_only) {
+    LOG(INFO) << "Read-only block manager, skipping repair";
+    return Status::OK();
+  }
+  if (report->HasFatalErrors()) {
+    LOG(WARNING) << "Found fatal and irreparable errors, skipping repair";
+    return Status::OK();
+  }
+
+  // From here on out we're committed to repairing.
+  ContainersByName containers_by_name;
+  {
+    std::lock_guard <simple_spinlock> l(lock_);
+
+    // 1. Remove all of the dead containers from the block manager. They will 
be
+    // deleted from disk shortly thereafter, outside of the lock.
+    for (const auto& d : dead_containers) {
+      CHECK(d->TrySetDead());
+      RemoveDeadContainerUnlocked(d->ToString());
+
+      // This must be true if clearing dead_containers below is to delete the
+      // container files.
+      DCHECK(d->HasOneRef());
+    }
+
+    // 2. Fetch all the containers we're going to need.
+    FindContainersToRepair(report, low_live_block_containers, 
&containers_by_name);
+  }
+
+  // 3. Clearing this vector drops the last references to the containers 
within,
+  // triggering their deletion.
+  dead_containers.clear();
+
+  // 4. Repair the containers according to 'report'.
+  RETURN_NOT_OK(DoRepair(dir, report, low_live_block_containers, 
containers_by_name));
+
+  // 5. Repunch all requested holes. Any excess space reclaimed was already
+  // tracked by LBMFullContainerSpaceCheck.
+  //
+  // Register deletions to a single BlockDeletionTransaction. So, the repunched
+  // holes belonging to the same container can be coalesced.
+  auto transaction = std::make_shared<LogBlockDeletionTransaction>(this);
+  for (const auto& b : need_repunching) {
+    b->RegisterDeletion(transaction);
+    transaction->AddBlock(b);
+  }
+
+  // 6. Clearing this vector drops the last references to the LogBlocks within,
+  // triggering the repunching operations.
+  need_repunching.clear();
+
+  return Status::OK();
+}
+
 Status LogBlockManager::RewriteMetadataFile(const LogBlockContainer& container,
                                             const vector<BlockRecordPB>& 
records,
                                             int64_t* file_bytes_delta) {
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index 0e5fffab8..830d1798f 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -57,6 +57,7 @@ struct FsReport;
 namespace internal {
 class LogBlock;
 class LogBlockContainer;
+class LogBlockContainerNativeMeta;
 class LogBlockDeletionTransaction;
 class LogWritableBlock;
 struct LogBlockContainerLoadResult;
@@ -65,6 +66,9 @@ struct LogBlockManagerMetrics;
 
 typedef scoped_refptr<internal::LogBlock> LogBlockRefPtr;
 typedef scoped_refptr<internal::LogBlockContainer> LogBlockContainerRefPtr;
+typedef scoped_refptr<internal::LogBlockContainerNativeMeta> 
LogBlockContainerNativeMetaRefPtr;
+typedef std::unordered_map<std::string, std::vector<BlockRecordPB>> 
ContainerBlocksByName;
+typedef std::unordered_map<std::string, LogBlockContainerRefPtr> 
ContainersByName;
 
 // A log-backed (i.e. sequentially allocated file) block storage
 // implementation.
@@ -205,7 +209,6 @@ class LogBlockManager : public BlockManager {
                   FileCache* file_cache,
                   BlockManagerOptions opts);
 
- private:
   FRIEND_TEST(LogBlockManagerTest, TestAbortBlock);
   FRIEND_TEST(LogBlockManagerTest, TestCloseFinalizedBlock);
   FRIEND_TEST(LogBlockManagerTest, TestCompactFullContainerMetadataAtStartup);
@@ -220,6 +223,7 @@ class LogBlockManager : public BlockManager {
   FRIEND_TEST(LogBlockManagerTest, TestFailMultipleTransactionsPerContainer);
 
   friend class internal::LogBlockContainer;
+  friend class internal::LogBlockContainerNativeMeta;
   friend class internal::LogBlockDeletionTransaction;
   friend class internal::LogWritableBlock;
   friend class LogBlockManagerTest;
@@ -253,6 +257,22 @@ class LogBlockManager : public BlockManager {
       BlockIdHash,
       BlockIdEqual> BlockRecordMap;
 
+  // Creates a new LogBlockContainer in 'dir', 'container' will be set as the 
newly created
+  // container only if created successfully.
+  //
+  // Returns Status::OK() if created successfully, otherwise returns an error.
+  virtual Status CreateContainer(Dir* dir, LogBlockContainerRefPtr* container) 
= 0;
+
+  // Opens an existing LogBlockContainer in 'dir' identified by 'id', 
'container' will be set as the
+  // opened container only if created successfully.
+  //
+  // Returns Status::OK() if created successfully, otherwise returns an error, 
the error is recorded
+  // in 'report'.
+  virtual Status OpenContainer(Dir* dir,
+                               FsReport* report,
+                               const std::string& id,
+                               LogBlockContainerRefPtr* container) = 0;
+
   // Adds an as of yet unseen container to this block manager.
   //
   // Must be called with 'lock_' held.
@@ -340,9 +360,26 @@ class LogBlockManager : public BlockManager {
                 FsReport* report,
                 std::vector<LogBlockRefPtr> need_repunching,
                 std::vector<LogBlockContainerRefPtr> dead_containers,
-                std::unordered_map<
-                    std::string,
-                    std::vector<BlockRecordPB>> low_live_block_containers);
+                const ContainerBlocksByName& low_live_block_containers);
+
+  // Fetch all the containers we're going to repair.
+  //
+  // The issues to repair are described in 'report' and 
'low_live_block_containers', the containers
+  // are returned by 'containers_by_name'.
+  virtual void FindContainersToRepair(FsReport* report,
+                                      const ContainerBlocksByName& 
low_live_block_containers,
+                                      ContainersByName* containers_by_name);
+
+  // Do the repair work.
+  //
+  // The following repairs will be performed:
+  // 1. Container data files in 'report->full_container_space_check' will be 
truncated.
+  //
+  // Returns an error if repairing a fatal inconsistency failed.
+  virtual Status DoRepair(Dir* dir,
+                          FsReport* report,
+                          const ContainerBlocksByName& 
low_live_block_containers,
+                          const ContainersByName& containers_by_name);
 
   // Rewrites a container metadata file, adding all entries in 'records'.
   // The new metadata file is created as a temporary file and renamed over the
@@ -498,6 +535,35 @@ class LogBlockManagerNativeMeta : public LogBlockManager {
                             BlockManagerOptions opts)
       : LogBlockManager(env, dd_manager, error_manager, file_cache, 
std::move(opts)) {
   }
+
+ private:
+  Status CreateContainer(Dir* dir, LogBlockContainerRefPtr* container) 
override;
+
+  // Returns Status::Aborted() in the case that the metadata and data files
+  // both appear to have no data (e.g. due to a crash just after creating
+  // one of them but before writing any records).
+  Status OpenContainer(Dir* dir,
+                       FsReport* report,
+                       const std::string& id,
+                       LogBlockContainerRefPtr* container) override;
+
+  void FindContainersToRepair(FsReport* report,
+                              const ContainerBlocksByName& 
low_live_block_containers,
+                              ContainersByName* containers_by_name) override;
+
+  // Do the repair work.
+  //
+  // The following repairs will be performed:
+  // 1. Repairs in LogBlockManager::DoRepair().
+  // 2. Container metadata files in 'report->partial_record_check' will be 
truncated.
+  // 3. Container data and metadata files in 
'report->incomplete_container_check' will be deleted.
+  // 4. Container metadata files in 'low_live_block_containers' will be 
compacted.
+  //
+  // Returns an error if repairing a fatal inconsistency failed.
+  Status DoRepair(Dir* dir,
+                  FsReport* report,
+                  const ContainerBlocksByName& low_live_block_containers,
+                  const ContainersByName& containers_by_name) override;
 };
 
 } // namespace fs

Reply via email to