This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 032d1d90147139db62a7cea8700d5373332bde9a
Author: Yingchun Lai <[email protected]>
AuthorDate: Mon Oct 18 18:53:06 2021 +0800

    [LBM] Speed up server bootstrap by using multi-thread to compact containers
    
    When a data directory has many low live block containers, a single thread to
    compact these container metadata files is low efficiency, we can improve it 
by
    multi-threads.
    In some test cases, e.g. one or very few data directories, high latency disk
    drivers, a large amount of low live block containers, and with a multi-core
    CPU, this patch could reduce about 85% time in metadata files compact
    procedure when bootstrap.
    
    Change-Id: Ie48211d9e8c1d74e520fcb04df25c1d681261bb5
    Reviewed-on: http://gerrit.cloudera.org:8080/17942
    Reviewed-by: Alexey Serbin <[email protected]>
    Tested-by: Alexey Serbin <[email protected]>
---
 src/kudu/fs/log_block_manager-test.cc | 52 ++++++++++++++++++++---
 src/kudu/fs/log_block_manager.cc      | 80 ++++++++++++++++++++++++++---------
 2 files changed, 106 insertions(+), 26 deletions(-)

diff --git a/src/kudu/fs/log_block_manager-test.cc 
b/src/kudu/fs/log_block_manager-test.cc
index 6af5992..9bd6e24 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -92,6 +92,12 @@ DEFINE_int32(startup_benchmark_block_count_for_testing, 
1000000,
              "Block count to do startup benchmark.");
 DEFINE_int32(startup_benchmark_data_dir_count_for_testing, 8,
              "Data directories to do startup benchmark.");
+DEFINE_int32(startup_benchmark_reopen_times, 10,
+             "Block manager reopen times.");
+DEFINE_int32(startup_benchmark_deleted_block_percentage, 90,
+             "Percentage of deleted blocks in containers.");
+DEFINE_validator(startup_benchmark_deleted_block_percentage,
+                 [](const char* /*n*/, int32_t v) { return 0 <= v && v <= 100; 
});
 
 // Block manager metrics.
 METRIC_DECLARE_counter(block_manager_total_blocks_deleted);
@@ -1008,15 +1014,28 @@ TEST_F(LogBlockManagerTest, TestParseKernelRelease) {
 // Simple micro-benchmark which creates a large number of blocks and then
 // times the startup of the LBM.
 //
-// This is simplistic in several ways compared to a typical workload:
-// - minimal number of containers, each of which is entirely full
+// This is simplistic in several ways compared to two typical workloads:
+// 1. minimal number of containers, each of which is entirely full
+//    without any deleted blocks.
 //   (typical workloads end up writing to several containers at once
 //    due to concurrent write operations such as multiple MM threads
 //    flushing)
-// - no deleted blocks to process
+// 2. minimal number of containers, each of which is entirely full
+//    with about --startup_benchmark_deleted_block_percentage percent
+//    deleted blocks.
+//   (typical workloads of write, alter operations, and background MM
+//    threads running a long time since last bootstrap)
 //
 // However it still can be used to micro-optimize the startup process.
-TEST_F(LogBlockManagerTest, StartupBenchmark) {
+class LogBlockManagerStartupBenchmarkTest:
+    public LogBlockManagerTest,
+    public ::testing::WithParamInterface<bool> {
+};
+INSTANTIATE_TEST_SUITE_P(StartupBenchmarkSuite, 
LogBlockManagerStartupBenchmarkTest,
+                         ::testing::Values(false, true));
+
+TEST_P(LogBlockManagerStartupBenchmarkTest, StartupBenchmark) {
+  bool delete_blocks = GetParam();
   std::vector<std::string> test_dirs;
   for (int i = 0; i < FLAGS_startup_benchmark_data_dir_count_for_testing; ++i) 
{
     test_dirs.emplace_back(test_dir_ + "/" + std::to_string(i));
@@ -1034,7 +1053,9 @@ TEST_F(LogBlockManagerTest, StartupBenchmark) {
   // for details.
   FLAGS_block_manager_preflush_control = "never";
   const int kNumBlocks = AllowSlowTests() ? 
FLAGS_startup_benchmark_block_count_for_testing : 1000;
+
   // Creates 'kNumBlocks' blocks with minimal data.
+  vector<BlockId> block_ids;
   {
     unique_ptr<BlockCreationTransaction> transaction = 
bm_->NewCreationTransaction();
     for (int i = 0; i < kNumBlocks; i++) {
@@ -1042,11 +1063,32 @@ TEST_F(LogBlockManagerTest, StartupBenchmark) {
       ASSERT_OK_FAST(bm_->CreateBlock(test_block_opts_, &block));
       ASSERT_OK_FAST(block->Append("x"));
       ASSERT_OK_FAST(block->Finalize());
+      block_ids.emplace_back(block->id());
       transaction->AddCreatedBlock(std::move(block));
     }
     ASSERT_OK(transaction->CommitCreatedBlocks());
   }
-  for (int i = 0; i < 10; i++) {
+
+  if (delete_blocks) {
+    std::mt19937 gen(SeedRandom());
+    std::shuffle(block_ids.begin(), block_ids.end(), gen);
+    {
+      int to_delete_count =
+          block_ids.size() * FLAGS_startup_benchmark_deleted_block_percentage 
/ 100;
+      shared_ptr<BlockDeletionTransaction> deletion_transaction =
+          this->bm_->NewDeletionTransaction();
+      for (const BlockId& b : block_ids) {
+        deletion_transaction->AddDeletedBlock(b);
+        if (--to_delete_count <= 0) {
+          break;
+        }
+      }
+      vector<BlockId> deleted;
+      ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted));
+    }
+  }
+
+  for (int i = 0; i < FLAGS_startup_benchmark_reopen_times; i++) {
     LOG_TIMING(INFO, "reopening block manager") {
       ASSERT_OK(ReopenBlockManager(nullptr, nullptr, test_dirs));
     }
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 9a17cdb..4cb27f5 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -20,6 +20,7 @@
 #include <errno.h>
 
 #include <algorithm>
+#include <atomic>
 #include <cstddef>
 #include <cstdint>
 #include <functional>
@@ -57,6 +58,7 @@
 #include "kudu/util/alignment.h"
 #include "kudu/util/array_view.h"
 #include "kudu/util/env.h"
+#include "kudu/util/fault_injection.h"
 #include "kudu/util/file_cache.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/locks.h"
@@ -69,7 +71,9 @@
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/sorted_disjoint_interval_list.h"
+#include "kudu/util/stopwatch.h"
 #include "kudu/util/test_util_prod.h"
+#include "kudu/util/threadpool.h"
 #include "kudu/util/trace.h"
 
 DECLARE_bool(enable_data_block_fsync);
@@ -123,6 +127,11 @@ DEFINE_bool(log_block_manager_delete_dead_container, true,
 TAG_FLAG(log_block_manager_delete_dead_container, advanced);
 TAG_FLAG(log_block_manager_delete_dead_container, experimental);
 
+DEFINE_int32(log_container_metadata_rewrite_inject_latency_ms, 0,
+             "Amount of latency in ms to inject when rewrite metadata file. "
+             "Only for testing.");
+TAG_FLAG(log_container_metadata_rewrite_inject_latency_ms, hidden);
+
 METRIC_DEFINE_gauge_uint64(server, log_block_manager_bytes_under_management,
                            "Bytes Under Management",
                            kudu::MetricUnit::kBytes,
@@ -2119,6 +2128,10 @@ Status LogBlockManager::Open(FsReport* report, 
std::atomic<int>* containers_proc
   dd_manager_->WaitOnClosures();
 
   // Check load errors and merge each data dir's container load results, then 
do repair tasks.
+  unique_ptr<ThreadPool> repair_pool;
+  RETURN_NOT_OK(ThreadPoolBuilder("repair_pool")
+                    .set_max_threads(dd_manager_->dirs().size())
+                    .Build(&repair_pool));
   vector<unique_ptr<internal::LogBlockContainerLoadResult>> dir_results(
       dd_manager_->dirs().size());
   for (int i = 0; i < dd_manager_->dirs().size(); ++i) {
@@ -2162,12 +2175,13 @@ Status LogBlockManager::Open(FsReport* report, 
std::atomic<int>* containers_proc
       dir_results[i] = std::move(dir_result);
       auto* dd_raw = dd.get();
       auto* dr = dir_results[i].get();
-      dd->ExecClosure([this, dd_raw, dr]() { this->RepairTask(dd_raw, dr); });
+      CHECK_OK(repair_pool->Submit([this, dd_raw, dr]() { 
this->RepairTask(dd_raw, dr); }));
     }
   }
 
   // Wait for the repair tasks to complete.
-  dd_manager_->WaitOnClosures();
+  repair_pool->Wait();
+  repair_pool->Shutdown();
 
   FsReport merged_report;
   for (int i = 0; i < dd_manager_->dirs().size(); ++i) {
@@ -2970,33 +2984,55 @@ Status LogBlockManager::Repair(
 
   // "Compact" metadata files with few live blocks by rewriting them with only
   // the live block records.
-  int64_t metadata_files_compacted = 0;
-  int64_t metadata_bytes_delta = 0;
+  std::atomic<int64_t> metadata_files_compacted = 0;
+  std::atomic<int64_t> metadata_bytes_delta = 0;
+  std::atomic<bool> seen_fatal_error = false;
+  Status first_fatal_error;
+  SCOPED_LOG_TIMING(INFO, "loading block containers with low live blocks");
   for (const auto& e : low_live_block_containers) {
+    if (seen_fatal_error.load()) {
+      break;
+    }
     LogBlockContainerRefPtr container = FindPtrOrNull(containers_by_name, 
e.first);
     if (!container) {
       // The container was deleted outright.
       continue;
     }
 
-    // Rewrite this metadata file. Failures are non-fatal.
-    int64_t file_bytes_delta;
-    const auto& meta_path = StrCat(e.first, kContainerMetadataFileSuffix);
-    Status s = RewriteMetadataFile(*(container.get()), e.second, 
&file_bytes_delta);
-    if (!s.ok()) {
-      WARN_NOT_OK(s, "could not rewrite metadata file");
-      continue;
-    }
+    dir->ExecClosure([this, &metadata_files_compacted, &metadata_bytes_delta,
+        &seen_fatal_error, &first_fatal_error, e, container]() {
+      // Rewrite this metadata file.
+      int64_t file_bytes_delta;
+      const auto& meta_path = StrCat(e.first, kContainerMetadataFileSuffix);
+      Status s = RewriteMetadataFile(*(container.get()), e.second, 
&file_bytes_delta);
+      if (!s.ok()) {
+        // Rewrite metadata file failure is non-fatal, just skip it.
+        WARN_NOT_OK(s, Substitute("could not rewrite metadata file $0", 
meta_path));
+        return;
+      }
 
-    // However, we're hosed if we can't open the new metadata file.
-    RETURN_NOT_OK_PREPEND(container->ReopenMetadataWriter(),
-                          "could not reopen new metadata file");
+      // Reopen the new metadata file.
+      s = container->ReopenMetadataWriter();
+      if (!s.ok()) {
+        // Open the new metadata file failure is fatal, stop processing other 
containers.
+        bool current_seen_error = false;
+        if (seen_fatal_error.compare_exchange_strong(current_seen_error, 
true)) {
+          first_fatal_error = s.CloneAndPrepend(
+              Substitute("could not reopen new metadata file", meta_path));
+        }
+        return;
+      }
 
-    metadata_files_compacted++;
-    metadata_bytes_delta += file_bytes_delta;
-    VLOG(1) << "Compacted metadata file " << meta_path
-            << " (saved " << file_bytes_delta << " bytes)";
+      metadata_files_compacted++;
+      metadata_bytes_delta += file_bytes_delta;
+      VLOG(1) << "Compacted metadata file " << meta_path << " (saved " << 
file_bytes_delta
+              << " bytes)";
+    });
+  }
 
+  dir->WaitOnClosures();
+  if (seen_fatal_error.load()) {
+    LOG_AND_RETURN(WARNING, first_fatal_error);
   }
 
   // The data directory can be synchronized once for all of the new metadata 
files.
@@ -3009,11 +3045,11 @@ Status LogBlockManager::Repair(
   // TODO(awong): The below will only be true with persistent disk states.
   // Disk failures do not suffer from this issue because, on the next startup,
   // the entire directory will not be used.
-  if (metadata_files_compacted > 0) {
+  if (metadata_files_compacted.load() > 0) {
     Status s = env_->SyncDir(dir->dir());
     RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(s, "Could not sync data directory");
     LOG(INFO) << Substitute("Compacted $0 metadata files ($1 metadata bytes)",
-                            metadata_files_compacted, metadata_bytes_delta);
+                            metadata_files_compacted.load(), 
metadata_bytes_delta.load());
   }
 
   return Status::OK();
@@ -3022,6 +3058,8 @@ Status LogBlockManager::Repair(
 Status LogBlockManager::RewriteMetadataFile(const LogBlockContainer& container,
                                             const vector<BlockRecordPB>& 
records,
                                             int64_t* file_bytes_delta) {
+  
MAYBE_INJECT_FIXED_LATENCY(FLAGS_log_container_metadata_rewrite_inject_latency_ms);
+
   const string metadata_file_name = StrCat(container.ToString(), 
kContainerMetadataFileSuffix);
   // Get the container's data directory's UUID for error handling.
   const string dir = container.data_dir()->dir();

Reply via email to