This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c5d195c3 KUDU-3371 [fs] make LBMCorruptor a base class
8c5d195c3 is described below

commit 8c5d195c340360dd962207eaa6969604c65ba9c1
Author: Yingchun Lai <[email protected]>
AuthorDate: Sat Sep 23 23:58:31 2023 +0800

    KUDU-3371 [fs] make LBMCorruptor a base class
    
    This patch has no functional changes, but just makes
    LBMCorruptor a base class for the newly added
    NativeMetadataLBMCorruptor.
    
    NativeMetadataLBMCorruptor is a test util to corrupt various
    on-disk data structures for the native metadata based log
    block container LogBlockContainerNativeMeta.
    
    This is a previous patch before introducing the RocksDB based
    log block container, a new derived class from LBMCorruptor will
    be introduced then.
    
    Change-Id: I5a1f2842598f46e88fbd08273d8fd19ed34a9cc5
    Reviewed-on: http://gerrit.cloudera.org:8080/20504
    Reviewed-by: Yifan Zhang <[email protected]>
    Tested-by: Yingchun Lai <[email protected]>
---
 src/kudu/fs/block_manager-stress-test.cc   |   6 +-
 src/kudu/fs/log_block_manager-test-util.cc | 379 ++++++++++++++++++-----------
 src/kudu/fs/log_block_manager-test-util.h  |  71 +++++-
 src/kudu/fs/log_block_manager-test.cc      |  43 ++--
 4 files changed, 314 insertions(+), 185 deletions(-)

diff --git a/src/kudu/fs/block_manager-stress-test.cc 
b/src/kudu/fs/block_manager-stress-test.cc
index dd240b664..659234887 100644
--- a/src/kudu/fs/block_manager-stress-test.cc
+++ b/src/kudu/fs/block_manager-stress-test.cc
@@ -498,11 +498,11 @@ void 
BlockManagerStressTest<FileBlockManager>::InjectNonFatalInconsistencies() {
 
 template <>
 void 
BlockManagerStressTest<LogBlockManagerNativeMeta>::InjectNonFatalInconsistencies()
 {
-  LBMCorruptor corruptor(env_, dd_manager_->GetDirs(), rand_seed_);
-  ASSERT_OK(corruptor.Init());
+  auto corruptor = LBMCorruptor::Create(env_, dd_manager_->GetDirs(), 
rand_seed_);
+  ASSERT_OK(corruptor->Init());
 
   for (int i = 0; i < FLAGS_num_inconsistencies; i++) {
-    ASSERT_OK(corruptor.InjectRandomNonFatalInconsistency());
+    ASSERT_OK(corruptor->InjectRandomNonFatalInconsistency());
   }
 }
 
diff --git a/src/kudu/fs/log_block_manager-test-util.cc 
b/src/kudu/fs/log_block_manager-test-util.cc
index 071544451..12117b131 100644
--- a/src/kudu/fs/log_block_manager-test-util.cc
+++ b/src/kudu/fs/log_block_manager-test-util.cc
@@ -35,12 +35,14 @@
 #include "kudu/gutil/integral_types.h"
 #include "kudu/gutil/strings/strcat.h"
 #include "kudu/gutil/strings/strip.h"
+#include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/env.h"
 #include "kudu/util/path_util.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 
+DECLARE_string(block_manager);
 DECLARE_uint64(log_container_max_size);
 
 namespace kudu {
@@ -52,6 +54,46 @@ using std::string;
 using std::vector;
 using std::unique_ptr;
 using std::unordered_map;
+using strings::Substitute;
+
+// LBMCorruptor to trace the LogBlockManagerNativeMeta data corruption.
+class NativeMetadataLBMCorruptor : public LBMCorruptor {
+ public:
+  NativeMetadataLBMCorruptor(Env* env, vector<string> data_dirs, uint32_t 
rand_seed)
+      : LBMCorruptor(env, std::move(data_dirs), rand_seed) {
+    max_malformed_types_ = MalformedRecordType::kTwoCreatesForSameBlockId;
+  }
+
+ private:
+  Status CreateIncompleteContainer() override;
+
+  Status AppendRecord(const Container* c, BlockId block_id,
+                      int64_t block_offset, int64_t block_length) override;
+
+  Status AppendCreateRecord(const Container* c, BlockId block_id,
+                            uint64_t block_offset, int64_t block_length) 
override;
+
+  Status AppendPartialRecord(const Container* c, BlockId block_id) override;
+
+  Status AppendMetadataRecord(const Container* c, const BlockRecordPB& record) 
override;
+
+  Status CreateContainerMetadataPart(const string& unsuffixed_path) const;
+
+  static Status AppendCreateRecordInternal(WritablePBContainerFile* writer, 
BlockId block_id,
+                                           int64_t block_offset, int64_t 
block_length);
+  static Status AppendDeleteRecordInternal(WritablePBContainerFile* writer, 
BlockId block_id);
+
+  Status CreateMalformedRecord(
+      const Container* c, MalformedRecordType error_type, BlockRecordPB* 
record) override;
+};
+
+unique_ptr<LBMCorruptor> LBMCorruptor::Create(
+    Env* env, std::vector<std::string> data_dirs, uint32_t rand_seed) {
+  if (FLAGS_block_manager == "log") {
+    return std::make_unique<NativeMetadataLBMCorruptor>(env, 
std::move(data_dirs), rand_seed);
+  }
+  return nullptr;
+}
 
 LBMCorruptor::LBMCorruptor(Env* env, vector<string> data_dirs, uint32_t 
rand_seed)
     : env_(env),
@@ -74,10 +116,12 @@ Status LBMCorruptor::Init() {
       string stripped;
       if (TryStripSuffixString(
           f, LogBlockManager::kContainerDataFileSuffix, &stripped)) {
+        containers_by_name[stripped].dir = dd;
         containers_by_name[stripped].name = stripped;
         containers_by_name[stripped].data_filename = JoinPathSegments(dd, f);
       } else if (TryStripSuffixString(
           f, LogBlockManager::kContainerMetadataFileSuffix, &stripped)) {
+        containers_by_name[stripped].dir = dd;
         containers_by_name[stripped].name = stripped;
         containers_by_name[stripped].metadata_filename = JoinPathSegments(dd, 
f);
       }
@@ -108,11 +152,12 @@ Status LBMCorruptor::Init() {
 Status LBMCorruptor::PreallocateFullContainer() {
   const int kPreallocateBytes = 16 * 1024;
   const Container* c = nullptr;
-  RETURN_NOT_OK(GetRandomContainer(FULL, &c));
+  RETURN_NOT_OK(GetRandomContainer(FULL, &c)); // 
NOLINT(clang-analyzer-core.NonNullParamChecker)
+  CHECK_NE(c, nullptr);
 
   // Pick one of the preallocation modes at random; both are recoverable.
   RWFile::PreAllocateMode mode;
-  int r = rand_.Uniform(2);
+  auto r = rand_.Uniform(2);
   if (r == 0) {
     mode = RWFile::CHANGE_FILE_SIZE;
   } else {
@@ -124,8 +169,9 @@ Status LBMCorruptor::PreallocateFullContainer() {
   RWFileOptions opts;
   opts.mode = Env::MUST_EXIST;
   opts.is_sensitive = true;
-  RETURN_NOT_OK(env_->NewRWFile(opts, c->data_filename, &data_file));
-  int64_t initial_size;
+  RETURN_NOT_OK(env_->NewRWFile( // 
NOLINT(clang-analyzer-core.NonNullParamChecker)
+      opts, c->data_filename, &data_file));
+  int64_t initial_size = 0;
   RETURN_NOT_OK(PreallocateForBlock(data_file.get(), mode,
                                     kPreallocateBytes, &initial_size));
   if (mode == RWFile::DONT_CHANGE_FILE_SIZE) {
@@ -146,7 +192,8 @@ Status LBMCorruptor::AddUnpunchedBlockToFullContainer() {
   RETURN_NOT_OK(GetRandomContainer(FULL, &c));
 
   uint64_t fs_block_size;
-  RETURN_NOT_OK(env_->GetBlockSize(c->data_filename, &fs_block_size));
+  RETURN_NOT_OK(env_->GetBlockSize( // 
NOLINT(clang-analyzer-core.NonNullParamChecker)
+      c->data_filename, &fs_block_size));
 
   // "Write" out the block by growing the data file by some random amount.
   //
@@ -157,69 +204,32 @@ Status LBMCorruptor::AddUnpunchedBlockToFullContainer() {
   opts.is_sensitive = true;
   RETURN_NOT_OK(env_->NewRWFile(opts, c->data_filename, &data_file));
   int64_t block_length = (rand_.Uniform(16) + 1) * fs_block_size;
-  int64_t initial_data_size;
+  int64_t initial_data_size = 0;
   RETURN_NOT_OK(PreallocateForBlock(data_file.get(), RWFile::CHANGE_FILE_SIZE,
                                     block_length, &initial_data_size));
   RETURN_NOT_OK(data_file->Close());
 
   // Having written out the block, write both CREATE and DELETE metadata
   // records for it.
-  unique_ptr<WritablePBContainerFile> metadata_writer;
-  RETURN_NOT_OK(OpenMetadataWriter(*c, &metadata_writer));
   BlockId block_id(rand_.Next64());
-  RETURN_NOT_OK(AppendCreateRecord(metadata_writer.get(), block_id,
-                                   initial_data_size, block_length));
-  RETURN_NOT_OK(AppendDeleteRecord(metadata_writer.get(), block_id));
-
+  RETURN_NOT_OK(AppendRecord(c, block_id, initial_data_size, block_length));
   LOG(INFO) << "Added unpunched block to full container " << c->name;
-  return metadata_writer->Close();
-}
 
-Status LBMCorruptor::CreateIncompleteContainer() {
-  unique_ptr<RWFile> data_file;
-  unique_ptr<RWFile> metadata_file;
-  string unsuffixed_path = JoinPathSegments(GetRandomDataDir(),
-                                            oid_generator_.Next());
-  string data_fname = StrCat(
-      unsuffixed_path, LogBlockManager::kContainerDataFileSuffix);
-  string metadata_fname = StrCat(
-      unsuffixed_path, LogBlockManager::kContainerMetadataFileSuffix);
+  return Status::OK();
+}
 
-  // Create an incomplete container. Kinds of incomplete containers:
-  //
-  // 1. Empty data file but no metadata file.
-  // 2. No data file but metadata file exists (and is up to a certain size).
-  // 3. Empty data file and metadata file exists (and is up to a certain size).
-  int r = rand_.Uniform(3);
+Status LBMCorruptor::CreateContainerDataPart(const std::string& 
unsuffixed_path) {
   RWFileOptions opts;
   opts.is_sensitive = true;
-  if (r == 0) {
-    RETURN_NOT_OK(env_->NewRWFile(opts, data_fname, &data_file));
-  } else if (r == 1) {
-    RETURN_NOT_OK(env_->NewRWFile(opts, metadata_fname, &data_file));
-  } else {
-    CHECK_EQ(r, 2);
-    RETURN_NOT_OK(env_->NewRWFile(opts, data_fname, &data_file));
-    RETURN_NOT_OK(env_->NewRWFile(opts, metadata_fname, &data_file));
-  }
-
-  if (data_file) {
-    RETURN_NOT_OK(data_file->Close());
-  }
-
-  if (metadata_file) {
-    int md_length = rand_.Uniform(pb_util::kPBContainerMinimumValidLength);
-    RETURN_NOT_OK(metadata_file->Truncate(md_length));
-    RETURN_NOT_OK(metadata_file->Close());
-  }
-
-  LOG(INFO) << "Created incomplete container " << unsuffixed_path;
-  return Status::OK();
+  unique_ptr<RWFile> data_file;
+  string data_fname = StrCat(unsuffixed_path, 
LogBlockManager::kContainerDataFileSuffix);
+  RETURN_NOT_OK(env_->NewRWFile(opts, data_fname, &data_file));
+  return data_file->Close();
 }
 
 Status LBMCorruptor::AddMalformedRecordToContainer() {
   const int kBlockSize = 16 * 1024;
-  const Container* c;
+  const Container* c = nullptr;
   RETURN_NOT_OK(GetRandomContainer(ANY, &c));
 
   // Ensure the container's data file has enough space for the new block. We're
@@ -232,7 +242,8 @@ Status LBMCorruptor::AddMalformedRecordToContainer() {
     RWFileOptions opts;
     opts.mode = Env::MUST_EXIST;
     opts.is_sensitive = true;
-    RETURN_NOT_OK(env_->NewRWFile(opts, c->data_filename, &data_file));
+    RETURN_NOT_OK(env_->NewRWFile( // 
NOLINT(clang-analyzer-core.NonNullParamChecker)
+        opts, c->data_filename, &data_file));
     RETURN_NOT_OK(PreallocateForBlock(data_file.get(), 
RWFile::CHANGE_FILE_SIZE,
                                       kBlockSize, &initial_data_size));
     RETURN_NOT_OK(data_file->Close());
@@ -247,52 +258,49 @@ Status LBMCorruptor::AddMalformedRecordToContainer() {
   record.set_length(kBlockSize);
   record.set_timestamp_us(0);
 
-  unique_ptr<WritablePBContainerFile> metadata_writer;
-  RETURN_NOT_OK(OpenMetadataWriter(*c, &metadata_writer));
-
-  // Corrupt the record in some way. Kinds of malformed records (as per the
-  // malformed record checking code in log_block_manager.cc):
-  //
-  // 0. No block offset.
-  // 1. No block length.
-  // 2. Negative block offset.
-  // 3. Negative block length.
-  // 4. Offset + length > data file size.
-  // 5. Two CREATEs for same block ID.
-  // 6. DELETE without first matching CREATE.
-  // 7. Unrecognized op type.
-  int r = rand_.Uniform(8);
-  if (r == 0) {
-    record.clear_offset();
-  } else if (r == 1) {
-    record.clear_length();
-  } else if (r == 2) {
-    record.set_offset(-1);
-  } else if (r == 3) {
-    record.set_length(-1);
-  } else if (r == 4) {
-    record.set_offset(kint64max / 2);
-  } else if (r == 5) {
-    RETURN_NOT_OK(metadata_writer->Append(record));
-  } else if (r == 6) {
-    record.clear_offset();
-    record.clear_length();
-    record.set_op_type(DELETE);
-  } else {
-    CHECK_EQ(r, 7);
-    record.set_op_type(UNKNOWN);
-  }
+  MalformedRecordType r = static_cast<MalformedRecordType>(
+      rand_.Uniform(static_cast<uint32_t>(max_malformed_types_) + 1));
+  RETURN_NOT_OK(CreateMalformedRecord(c, r, &record));
+  RETURN_NOT_OK(AppendMetadataRecord(c, record));
 
   LOG(INFO) << "Added malformed record to container " << c->name;
-  return metadata_writer->Append(record);
+  return Status::OK();
+}
+
+Status LBMCorruptor::CreateMalformedRecord(
+    const Container* /* c */, MalformedRecordType error_type, BlockRecordPB* 
record) {
+  switch (error_type) {
+    case MalformedRecordType::kNoBlockOffset:
+      record->clear_offset();
+      break;
+    case MalformedRecordType::kNoBlockLength:
+      record->clear_length();
+      break;
+    case MalformedRecordType::kNegativeBlockOffset:
+      record->set_offset(-1);
+      break;
+    case MalformedRecordType::kNegativeBlockLength:
+      record->set_length(-1);
+      break;
+    case MalformedRecordType::kMetadataSizeLargerThanDataSize:
+      record->set_offset(kint64max / 2);
+      break;
+    case MalformedRecordType::kUnrecognizedOpType:
+      record->set_op_type(UNKNOWN);
+      break;
+    default:
+      LOG(FATAL) << "Unexpected value " << static_cast<uint32_t>(error_type);
+  }
+  return Status::OK();
 }
 
 Status LBMCorruptor::AddMisalignedBlockToContainer() {
-  const Container* c;
+  const Container* c = nullptr;
   RETURN_NOT_OK(GetRandomContainer(ANY, &c));
 
   uint64_t fs_block_size;
-  RETURN_NOT_OK(env_->GetBlockSize(c->data_filename, &fs_block_size));
+  RETURN_NOT_OK(env_->GetBlockSize( // 
NOLINT(clang-analyzer-core.NonNullParamChecker)
+      c->data_filename, &fs_block_size));
 
   unique_ptr<RWFile> data_file;
   RWFileOptions opts;
@@ -341,44 +349,26 @@ Status LBMCorruptor::AddMisalignedBlockToContainer() {
   RETURN_NOT_OK(data_file->Close());
 
   // Having written out the block, write a corresponding metadata record.
-  unique_ptr<WritablePBContainerFile> metadata_writer;
-  RETURN_NOT_OK(OpenMetadataWriter(*c, &metadata_writer));
-  RETURN_NOT_OK(AppendCreateRecord(metadata_writer.get(), block_id,
-                                   block_offset, block_length));
+  RETURN_NOT_OK(AppendCreateRecord(c, block_id, block_offset, block_length));
 
-  LOG(INFO) << "Added misaligned block to container " << c->name;
-  return metadata_writer->Close();
+  LOG(INFO) << Substitute("Added misaligned block $0 to container $1",
+                          block_id.ToString(), c->name);
+  return Status::OK();
 }
 
 Status LBMCorruptor::AddPartialRecordToContainer() {
-  const Container* c;
+  const Container* c = nullptr;
   RETURN_NOT_OK(GetRandomContainer(ANY, &c));
 
-  unique_ptr<WritablePBContainerFile> metadata_writer;
-  RETURN_NOT_OK(OpenMetadataWriter(*c, &metadata_writer));
-
-  // Add a new good record to the container.
-  RETURN_NOT_OK(AppendCreateRecord(metadata_writer.get(),
-                                   BlockId(rand_.Next64()),
-                                   0, 0));
-
-  // Corrupt the record by truncating one byte off the end of it.
-  {
-    RWFileOptions opts;
-    opts.mode = Env::MUST_EXIST;
-    opts.is_sensitive = true;
-    unique_ptr<RWFile> metadata_file;
-    RETURN_NOT_OK(env_->NewRWFile(opts, c->metadata_filename, &metadata_file));
-    uint64_t initial_metadata_size;
-    RETURN_NOT_OK(metadata_file->Size(&initial_metadata_size));
-    RETURN_NOT_OK(metadata_file->Truncate(initial_metadata_size - 1));
-  }
+  BlockId block_id = BlockId(rand_.Next64());
+  RETURN_NOT_OK(
+      AppendPartialRecord(c, block_id)); // 
NOLINT(clang-analyzer-core.NonNullParamChecker)
 
   // Once a container has a partial record, it cannot be further corrupted by
   // the corruptor.
 
   // Make a local copy of the container's name; erase() below will free it.
-  string container_name = c->name;
+  string container_name = c->name; // 
NOLINT(clang-analyzer-core.NonNullParamChecker)
 
   auto remove_matching_container = [&](const Container& e) {
     return container_name == e.name;
@@ -398,28 +388,29 @@ Status LBMCorruptor::AddPartialRecordToContainer() {
 
 Status LBMCorruptor::InjectRandomNonFatalInconsistency() {
   while (true) {
-    int r = rand_.Uniform(5);
+    NonFatalInconsistency r = static_cast<NonFatalInconsistency>(rand_.Uniform(
+        
static_cast<uint32_t>(NonFatalInconsistency::kMaxNonFatalInconsistency)));
     switch (r) {
-      case 0:
+      case NonFatalInconsistency::kMisalignedBlockToContainer:
         return AddMisalignedBlockToContainer();
-      case 1:
+      case NonFatalInconsistency::kIncompleteContainer:
         return CreateIncompleteContainer();
-      case 2:
+      case NonFatalInconsistency::kPreallocateFullContainer:
         if (full_containers_.empty()) {
           // Loop and try a different operation.
           break;
         }
         return PreallocateFullContainer();
-      case 3:
+      case NonFatalInconsistency::kUnpunchedBlockToFullContainer:
         if (full_containers_.empty()) {
           // Loop and try a different operation.
           break;
         }
         return AddUnpunchedBlockToFullContainer();
-      case 4:
+      case NonFatalInconsistency::kPartialRecordToContainer:
         return AddPartialRecordToContainer();
       default:
-        LOG(FATAL) << "Unexpected value " << r;
+        LOG(FATAL) << "Unexpected value " << static_cast<uint32_t>(r);
     }
   }
 }
@@ -442,28 +433,6 @@ Status LBMCorruptor::OpenMetadataWriter(
   return Status::OK();
 }
 
-Status LBMCorruptor::AppendCreateRecord(WritablePBContainerFile* writer,
-                                        BlockId block_id,
-                                        int64_t block_offset,
-                                        int64_t block_length) {
-  BlockRecordPB record;
-  block_id.CopyToPB(record.mutable_block_id());
-  record.set_op_type(CREATE);
-  record.set_offset(block_offset);
-  record.set_length(block_length);
-  record.set_timestamp_us(0); // has no effect
-  return writer->Append(record);
-}
-
-Status LBMCorruptor::AppendDeleteRecord(WritablePBContainerFile* writer,
-                                        BlockId block_id) {
-  BlockRecordPB record;
-  block_id.CopyToPB(record.mutable_block_id());
-  record.set_op_type(DELETE);
-  record.set_timestamp_us(0); // has no effect
-  return writer->Append(record);
-}
-
 Status LBMCorruptor::PreallocateForBlock(RWFile* data_file,
                                          RWFile::PreAllocateMode mode,
                                          int64_t block_length,
@@ -498,5 +467,121 @@ const string& LBMCorruptor::GetRandomDataDir() const {
   return data_dirs_[rand_.Uniform(data_dirs_.size())];
 }
 
+Status NativeMetadataLBMCorruptor::CreateIncompleteContainer() {
+  string unsuffixed_path = JoinPathSegments(GetRandomDataDir(), 
oid_generator_.Next());
+  // Create an incomplete container. Kinds of incomplete containers:
+  //
+  // 0. Empty data file but no metadata file.
+  // 1. No data file but metadata file exists (and is up to a certain size).
+  // 2. Empty data file and metadata file exists (and is up to a certain size).
+  auto r = rand_.Uniform(3);
+  if (r == 0) {
+    RETURN_NOT_OK(CreateContainerDataPart(unsuffixed_path));
+  } else if (r == 1) {
+    RETURN_NOT_OK(CreateContainerMetadataPart(unsuffixed_path));
+  } else {
+    CHECK_EQ(r, 2);
+    RETURN_NOT_OK(CreateContainerDataPart(unsuffixed_path));
+    RETURN_NOT_OK(CreateContainerMetadataPart(unsuffixed_path));
+  }
+  LOG(INFO) << "Created incomplete container " << unsuffixed_path;
+  return Status::OK();
+}
+
+Status NativeMetadataLBMCorruptor::AppendRecord(
+    const Container* c, BlockId block_id, int64_t block_offset, int64_t 
block_length) {
+  unique_ptr<WritablePBContainerFile> metadata_writer;
+  RETURN_NOT_OK(OpenMetadataWriter(*c, &metadata_writer));
+  RETURN_NOT_OK(AppendCreateRecordInternal(metadata_writer.get(), block_id,
+                                           block_offset, block_length));
+  RETURN_NOT_OK(AppendDeleteRecordInternal(metadata_writer.get(), block_id));
+  return metadata_writer->Close();
+}
+
+Status NativeMetadataLBMCorruptor::AppendCreateRecord(
+    const Container* c, BlockId block_id, uint64_t block_offset, int64_t 
block_length) {
+  unique_ptr<WritablePBContainerFile> metadata_writer;
+  RETURN_NOT_OK(OpenMetadataWriter(*c, &metadata_writer));
+  RETURN_NOT_OK(AppendCreateRecordInternal(metadata_writer.get(), block_id,
+                                           block_offset, block_length));
+  return metadata_writer->Close();
+}
+
+Status NativeMetadataLBMCorruptor::AppendPartialRecord(
+    const Container* c, BlockId block_id) {
+  unique_ptr<WritablePBContainerFile> metadata_writer;
+  RETURN_NOT_OK(OpenMetadataWriter(*c, &metadata_writer));
+  // Add a new good record to the container.
+  RETURN_NOT_OK(AppendCreateRecordInternal(metadata_writer.get(), block_id, 0, 
0));
+  // Corrupt the record by truncating one byte off the end of it.
+  RWFileOptions opts;
+  opts.mode = Env::MUST_EXIST;
+  opts.is_sensitive = true;
+  unique_ptr<RWFile> metadata_file;
+  RETURN_NOT_OK(env_->NewRWFile(opts, c->metadata_filename, &metadata_file));
+  uint64_t initial_metadata_size;
+  RETURN_NOT_OK(metadata_file->Size(&initial_metadata_size));
+  return metadata_file->Truncate(initial_metadata_size - 1);
+}
+
+Status NativeMetadataLBMCorruptor::AppendMetadataRecord(
+    const Container* c, const BlockRecordPB& record) {
+  unique_ptr<WritablePBContainerFile> metadata_writer;
+  RETURN_NOT_OK(OpenMetadataWriter(*c, &metadata_writer));
+  return metadata_writer->Append(record);
+}
+
+Status NativeMetadataLBMCorruptor::CreateContainerMetadataPart(
+    const string& unsuffixed_path) const {
+  RWFileOptions opts;
+  opts.is_sensitive = true;
+  unique_ptr<RWFile> metadata_file;
+  string metadata_fname = StrCat(
+      unsuffixed_path, LogBlockManager::kContainerMetadataFileSuffix);
+  RETURN_NOT_OK(env_->NewRWFile(opts, metadata_fname, &metadata_file));
+  // Truncate the metadata file to a small random length.
+  int md_length = rand_.Uniform(pb_util::kPBContainerMinimumValidLength);
+  RETURN_NOT_OK(metadata_file->Truncate(md_length));
+  return metadata_file->Close();
+}
+
+Status NativeMetadataLBMCorruptor::CreateMalformedRecord(
+    const Container* c, MalformedRecordType error_type, BlockRecordPB* record) 
{
+  switch (error_type) {
+    case MalformedRecordType::kDeleteWithoutFirstMatchingCreate:
+      record->clear_offset();
+      record->clear_length();
+      record->set_op_type(DELETE);
+      break;
+    case MalformedRecordType::kTwoCreatesForSameBlockId:
+      RETURN_NOT_OK(AppendMetadataRecord(c, *record));
+      break;
+    default:
+      return LBMCorruptor::CreateMalformedRecord(c, error_type, record);
+  }
+  return Status::OK();
+}
+
+Status NativeMetadataLBMCorruptor::AppendCreateRecordInternal(
+    WritablePBContainerFile* writer, BlockId block_id,
+    int64_t block_offset, int64_t block_length) {
+  BlockRecordPB record;
+  block_id.CopyToPB(record.mutable_block_id());
+  record.set_op_type(CREATE);
+  record.set_offset(block_offset);
+  record.set_length(block_length);
+  record.set_timestamp_us(0); // has no effect
+  return writer->Append(record);
+}
+
+Status NativeMetadataLBMCorruptor::AppendDeleteRecordInternal(
+    WritablePBContainerFile* writer, BlockId block_id) {
+  BlockRecordPB record;
+  block_id.CopyToPB(record.mutable_block_id());
+  record.set_op_type(DELETE);
+  record.set_timestamp_us(0); // has no effect
+  return writer->Append(record);
+}
+
 } // namespace fs
 } // namespace kudu
diff --git a/src/kudu/fs/log_block_manager-test-util.h 
b/src/kudu/fs/log_block_manager-test-util.h
index 7cafbecde..ff3dfd20b 100644
--- a/src/kudu/fs/log_block_manager-test-util.h
+++ b/src/kudu/fs/log_block_manager-test-util.h
@@ -30,6 +30,7 @@
 namespace kudu {
 
 class BlockId;
+class BlockRecordPB;
 
 namespace pb_util {
 class WritablePBContainerFile;
@@ -40,7 +41,11 @@ namespace fs {
 // Corrupts various log block manager on-disk data structures.
 class LBMCorruptor {
  public:
-  LBMCorruptor(Env* env, std::vector<std::string> data_dirs, uint32_t 
rand_seed);
+  // Create a LBMCorruptor according to the --block_manager flag.
+  static std::unique_ptr<LBMCorruptor> Create(
+      Env* env, std::vector<std::string> data_dirs, uint32_t rand_seed);
+
+  virtual ~LBMCorruptor() = default;
 
   // Initializes a the corruptor, parsing all data directories for containers.
   //
@@ -63,7 +68,7 @@ class LBMCorruptor {
 
   // Creates a new incomplete container. This inconsistency is non-fatal and
   // repairable.
-  Status CreateIncompleteContainer();
+  virtual Status CreateIncompleteContainer() = 0;
 
   // Adds a malformed record to a container (chosen at random). This
   // inconsistency is fatal and irreparable.
@@ -87,12 +92,23 @@ class LBMCorruptor {
   // Returns an error if a container could not be found.
   Status AddPartialRecordToContainer();
 
+  enum class NonFatalInconsistency : uint32_t {
+    kMisalignedBlockToContainer = 0,
+    kIncompleteContainer,
+    kPreallocateFullContainer,
+    kUnpunchedBlockToFullContainer,
+    kPartialRecordToContainer,
+    kMaxNonFatalInconsistency
+  };
   // Injects one of the above non-fatal inconsistencies (chosen at random).
   Status InjectRandomNonFatalInconsistency();
 
- private:
+ protected:
+  LBMCorruptor(Env* env, std::vector<std::string> data_dirs, uint32_t 
rand_seed);
+
   // Describes an on-disk LBM container.
   struct Container {
+    std::string dir;
     std::string name;
     std::string data_filename;
     std::string metadata_filename;
@@ -103,16 +119,6 @@ class LBMCorruptor {
       const Container& container,
       std::unique_ptr<pb_util::WritablePBContainerFile>* writer);
 
-  // Appends a CREATE record to 'writer'.
-  static Status AppendCreateRecord(pb_util::WritablePBContainerFile* writer,
-                                   BlockId block_id,
-                                   int64_t block_offset,
-                                   int64_t block_length);
-
-  // Appends a DELETE record to 'writer'.
-  static Status AppendDeleteRecord(pb_util::WritablePBContainerFile* writer,
-                                   BlockId block_id);
-
   // Preallocates space at the end of a container's data file for a new block.
   //
   // On success, writes the initial data file's size to 'old_data_file_size'.
@@ -134,11 +140,50 @@ class LBMCorruptor {
   // Gets a data directory chosen at random.
   const std::string& GetRandomDataDir() const;
 
+  // Appends a CREATE-DELETE pair of records to the container 'c', the newly
+  // created record has a unique id of 'block_id' and is located at
+  // 'block_offset' with a size of 'block_length'.
+  virtual Status AppendRecord(const Container* c, BlockId block_id,
+                              int64_t block_offset, int64_t block_length) = 0;
+
+  // Similar to the above, but only appends the CREATE record.
+  virtual Status AppendCreateRecord(const Container* c, BlockId block_id,
+                                    uint64_t block_offset, int64_t 
block_length) = 0;
+
+  // Appends a partial CREATE record to the metadata part of container 'c', the
+  // newly created record has a unique id of 'block_id'. The record is 
corrupted
+  // by truncating one byte off the end of it.
+  virtual Status AppendPartialRecord(const Container* c, BlockId block_id) = 0;
+
+  // Appends a 'record' to the metadata part of container 'c'.
+  virtual Status AppendMetadataRecord(const Container* c, const BlockRecordPB& 
record) = 0;
+
+  // Corrupt the 'record' in some way. Kinds of malformed records (as per the
+  // malformed record checking code in log_block_manager.cc).
+  enum class MalformedRecordType : uint32_t {
+    // All LBMCorruptor derive classes have these kind of errors.
+    kNoBlockOffset = 0,
+    kNoBlockLength,
+    kNegativeBlockOffset,
+    kNegativeBlockLength,
+    kMetadataSizeLargerThanDataSize,
+    kUnrecognizedOpType,
+    // Only NativeMetadataLBMCorruptor class has these kind of errors.
+    kDeleteWithoutFirstMatchingCreate,
+    kTwoCreatesForSameBlockId
+  };
+  // Returns an error if a container could not be found.
+  virtual Status CreateMalformedRecord(
+      const Container* c, MalformedRecordType error_type, BlockRecordPB* 
record);
+
+  Status CreateContainerDataPart(const std::string& unsuffixed_path);
+
   // Initialized in the constructor.
   Env* env_;
   const std::vector<std::string> data_dirs_;
   mutable Random rand_;
   ObjectIdGenerator oid_generator_;
+  MalformedRecordType max_malformed_types_;
 
   // Initialized in Init().
   std::vector<Container> all_containers_;
diff --git a/src/kudu/fs/log_block_manager-test.cc 
b/src/kudu/fs/log_block_manager-test.cc
index 665d12d1c..05bbe9e43 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -188,7 +188,6 @@ class LogBlockManagerTest : public KuduTest, public 
::testing::WithParamInterfac
                                                          
DataDirManagerOptions(), &dd_manager_));
       RETURN_NOT_OK(dd_manager_->LoadDataDirGroupFromPB(test_tablet_name_, 
test_group_pb_));
     }
-
     bm_ = CreateBlockManager(metric_entity, test_data_dirs);
     RETURN_NOT_OK(bm_->Open(report, BlockManager::MergeReport::NOT_REQUIRED, 
nullptr, nullptr));
     return Status::OK();
@@ -1390,12 +1389,12 @@ TEST_P(LogBlockManagerTest, TestMisalignedBlocksFuzz) {
   NO_FATALS(GetOnlyContainer(&container_name));
 
   // Add a mixture of regular and misaligned blocks to it.
-  LBMCorruptor corruptor(env_, dd_manager_->GetDirs(), SeedRandom());
-  ASSERT_OK(corruptor.Init());
+  auto corruptor = LBMCorruptor::Create(env_, dd_manager_->GetDirs(), 
SeedRandom());
+  ASSERT_OK(corruptor->Init());
   int num_misaligned_blocks = 0;
   for (int i = 0; i < kNumBlocks; i++) {
     if (rand() % 2) {
-      ASSERT_OK(corruptor.AddMisalignedBlockToContainer());
+      ASSERT_OK(corruptor->AddMisalignedBlockToContainer());
 
       // Need to reopen the block manager after each corruption because the
       // container metadata writers do not expect the metadata files to have
@@ -1508,9 +1507,9 @@ TEST_P(LogBlockManagerTest, 
TestRepairPreallocateExcessSpace) {
   NO_FATALS(GetContainerNames(&container_names));
 
   // Corrupt one container.
-  LBMCorruptor corruptor(env_, dd_manager_->GetDirs(), SeedRandom());
-  ASSERT_OK(corruptor.Init());
-  ASSERT_OK(corruptor.PreallocateFullContainer());
+  auto corruptor = LBMCorruptor::Create(env_, dd_manager_->GetDirs(), 
SeedRandom());
+  ASSERT_OK(corruptor->Init());
+  ASSERT_OK(corruptor->PreallocateFullContainer());
 
   // Check the report.
   FsReport report;
@@ -1553,10 +1552,10 @@ TEST_P(LogBlockManagerTest, TestRepairUnpunchedBlocks) {
   ASSERT_OK(env_->GetFileSizeOnDisk(data_file, &initial_file_size_on_disk));
 
   // Add some "unpunched blocks" to the container.
-  LBMCorruptor corruptor(env_, dd_manager_->GetDirs(), SeedRandom());
-  ASSERT_OK(corruptor.Init());
+  auto corruptor = LBMCorruptor::Create(env_, dd_manager_->GetDirs(), 
SeedRandom());
+  ASSERT_OK(corruptor->Init());
   for (int i = 0; i < kNumBlocks; i++) {
-    ASSERT_OK(corruptor.AddUnpunchedBlockToFullContainer());
+    ASSERT_OK(corruptor->AddUnpunchedBlockToFullContainer());
   }
 
   uint64_t file_size_on_disk;
@@ -1596,10 +1595,10 @@ TEST_P(LogBlockManagerTest, 
TestRepairIncompleteContainer) {
   // Create some incomplete containers. The corruptor will select between
   // several variants of "incompleteness" at random (see
   // LBMCorruptor::CreateIncompleteContainer() for details).
-  LBMCorruptor corruptor(env_, dd_manager_->GetDirs(), SeedRandom());
-  ASSERT_OK(corruptor.Init());
+  auto corruptor = LBMCorruptor::Create(env_, dd_manager_->GetDirs(), 
SeedRandom());
+  ASSERT_OK(corruptor->Init());
   for (int i = 0; i < kNumContainers; i++) {
-    ASSERT_OK(corruptor.CreateIncompleteContainer());
+    ASSERT_OK(corruptor->CreateIncompleteContainer());
   }
   vector<string> container_names;
   NO_FATALS(GetContainerNames(&container_names));
@@ -1635,10 +1634,10 @@ TEST_P(LogBlockManagerTest, TestDetectMalformedRecords) 
{
   // Add some malformed records. The corruptor will select between
   // several variants of "malformedness" at random (see
   // LBMCorruptor::AddMalformedRecordToContainer for details).
-  LBMCorruptor corruptor(env_, dd_manager_->GetDirs(), SeedRandom());
-  ASSERT_OK(corruptor.Init());
+  auto corruptor = LBMCorruptor::Create(env_, dd_manager_->GetDirs(), 
SeedRandom());
+  ASSERT_OK(corruptor->Init());
   for (int i = 0; i < kNumRecords; i++) {
-    ASSERT_OK(corruptor.AddMalformedRecordToContainer());
+    ASSERT_OK(corruptor->AddMalformedRecordToContainer());
   }
 
   // Check the report.
@@ -1666,10 +1665,10 @@ TEST_P(LogBlockManagerTest, TestDetectMisalignedBlocks) 
{
   NO_FATALS(GetOnlyContainer(&container_name));
 
   // Add some misaligned blocks.
-  LBMCorruptor corruptor(env_, dd_manager_->GetDirs(), SeedRandom());
-  ASSERT_OK(corruptor.Init());
+  auto corruptor = LBMCorruptor::Create(env_, dd_manager_->GetDirs(), 
SeedRandom());
+  ASSERT_OK(corruptor->Init());
   for (int i = 0; i < kNumBlocks; i++) {
-    ASSERT_OK(corruptor.AddMisalignedBlockToContainer());
+    ASSERT_OK(corruptor->AddMisalignedBlockToContainer());
   }
 
   // Check the report.
@@ -1706,10 +1705,10 @@ TEST_P(LogBlockManagerTest, TestRepairPartialRecords) {
   ASSERT_EQ(kNumContainers, container_names.size());
 
   // Add some partial records.
-  LBMCorruptor corruptor(env_, dd_manager_->GetDirs(), SeedRandom());
-  ASSERT_OK(corruptor.Init());
+  auto corruptor = LBMCorruptor::Create(env_, dd_manager_->GetDirs(), 
SeedRandom());
+  ASSERT_OK(corruptor->Init());
   for (int i = 0; i < kNumRecords; i++) {
-    ASSERT_OK(corruptor.AddPartialRecordToContainer());
+    ASSERT_OK(corruptor->AddPartialRecordToContainer());
   }
 
   // Check the report.

Reply via email to