This is an automated email from the ASF dual-hosted git repository. awong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 032d1d90147139db62a7cea8700d5373332bde9a Author: Yingchun Lai <[email protected]> AuthorDate: Mon Oct 18 18:53:06 2021 +0800 [LBM] Speed up server bootstrap by using multi-thread to compact containers When a data directory has many low live block containers, a single thread to compact these container metadata files is low efficiency, we can improve it by multi-threads. In some test cases, e.g. one or very few data directories, high latency disk drivers, a large amount of low live block containers, and with a multi-core CPU, this patch could reduce about 85% time in metadata files compact procedure when bootstrap. Change-Id: Ie48211d9e8c1d74e520fcb04df25c1d681261bb5 Reviewed-on: http://gerrit.cloudera.org:8080/17942 Reviewed-by: Alexey Serbin <[email protected]> Tested-by: Alexey Serbin <[email protected]> --- src/kudu/fs/log_block_manager-test.cc | 52 ++++++++++++++++++++--- src/kudu/fs/log_block_manager.cc | 80 ++++++++++++++++++++++++++--------- 2 files changed, 106 insertions(+), 26 deletions(-) diff --git a/src/kudu/fs/log_block_manager-test.cc b/src/kudu/fs/log_block_manager-test.cc index 6af5992..9bd6e24 100644 --- a/src/kudu/fs/log_block_manager-test.cc +++ b/src/kudu/fs/log_block_manager-test.cc @@ -92,6 +92,12 @@ DEFINE_int32(startup_benchmark_block_count_for_testing, 1000000, "Block count to do startup benchmark."); DEFINE_int32(startup_benchmark_data_dir_count_for_testing, 8, "Data directories to do startup benchmark."); +DEFINE_int32(startup_benchmark_reopen_times, 10, + "Block manager reopen times."); +DEFINE_int32(startup_benchmark_deleted_block_percentage, 90, + "Percentage of deleted blocks in containers."); +DEFINE_validator(startup_benchmark_deleted_block_percentage, + [](const char* /*n*/, int32_t v) { return 0 <= v && v <= 100; }); // Block manager metrics. METRIC_DECLARE_counter(block_manager_total_blocks_deleted); @@ -1008,15 +1014,28 @@ TEST_F(LogBlockManagerTest, TestParseKernelRelease) { // Simple micro-benchmark which creates a large number of blocks and then // times the startup of the LBM. // -// This is simplistic in several ways compared to a typical workload: -// - minimal number of containers, each of which is entirely full +// This is simplistic in several ways compared to two typical workloads: +// 1. minimal number of containers, each of which is entirely full +// without any deleted blocks. // (typical workloads end up writing to several containers at once // due to concurrent write operations such as multiple MM threads // flushing) -// - no deleted blocks to process +// 2. minimal number of containers, each of which is entirely full +// with about --startup_benchmark_deleted_block_percentage percent +// deleted blocks. +// (typical workloads of write, alter operations, and background MM +// threads running a long time since last bootstrap) // // However it still can be used to micro-optimize the startup process. -TEST_F(LogBlockManagerTest, StartupBenchmark) { +class LogBlockManagerStartupBenchmarkTest: + public LogBlockManagerTest, + public ::testing::WithParamInterface<bool> { +}; +INSTANTIATE_TEST_SUITE_P(StartupBenchmarkSuite, LogBlockManagerStartupBenchmarkTest, + ::testing::Values(false, true)); + +TEST_P(LogBlockManagerStartupBenchmarkTest, StartupBenchmark) { + bool delete_blocks = GetParam(); std::vector<std::string> test_dirs; for (int i = 0; i < FLAGS_startup_benchmark_data_dir_count_for_testing; ++i) { test_dirs.emplace_back(test_dir_ + "/" + std::to_string(i)); @@ -1034,7 +1053,9 @@ TEST_F(LogBlockManagerTest, StartupBenchmark) { // for details. FLAGS_block_manager_preflush_control = "never"; const int kNumBlocks = AllowSlowTests() ? FLAGS_startup_benchmark_block_count_for_testing : 1000; + // Creates 'kNumBlocks' blocks with minimal data. + vector<BlockId> block_ids; { unique_ptr<BlockCreationTransaction> transaction = bm_->NewCreationTransaction(); for (int i = 0; i < kNumBlocks; i++) { @@ -1042,11 +1063,32 @@ TEST_F(LogBlockManagerTest, StartupBenchmark) { ASSERT_OK_FAST(bm_->CreateBlock(test_block_opts_, &block)); ASSERT_OK_FAST(block->Append("x")); ASSERT_OK_FAST(block->Finalize()); + block_ids.emplace_back(block->id()); transaction->AddCreatedBlock(std::move(block)); } ASSERT_OK(transaction->CommitCreatedBlocks()); } - for (int i = 0; i < 10; i++) { + + if (delete_blocks) { + std::mt19937 gen(SeedRandom()); + std::shuffle(block_ids.begin(), block_ids.end(), gen); + { + int to_delete_count = + block_ids.size() * FLAGS_startup_benchmark_deleted_block_percentage / 100; + shared_ptr<BlockDeletionTransaction> deletion_transaction = + this->bm_->NewDeletionTransaction(); + for (const BlockId& b : block_ids) { + deletion_transaction->AddDeletedBlock(b); + if (--to_delete_count <= 0) { + break; + } + } + vector<BlockId> deleted; + ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted)); + } + } + + for (int i = 0; i < FLAGS_startup_benchmark_reopen_times; i++) { LOG_TIMING(INFO, "reopening block manager") { ASSERT_OK(ReopenBlockManager(nullptr, nullptr, test_dirs)); } diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc index 9a17cdb..4cb27f5 100644 --- a/src/kudu/fs/log_block_manager.cc +++ b/src/kudu/fs/log_block_manager.cc @@ -20,6 +20,7 @@ #include <errno.h> #include <algorithm> +#include <atomic> #include <cstddef> #include <cstdint> #include <functional> @@ -57,6 +58,7 @@ #include "kudu/util/alignment.h" #include "kudu/util/array_view.h" #include "kudu/util/env.h" +#include "kudu/util/fault_injection.h" #include "kudu/util/file_cache.h" #include "kudu/util/flag_tags.h" #include "kudu/util/locks.h" @@ -69,7 +71,9 @@ #include "kudu/util/scoped_cleanup.h" #include "kudu/util/slice.h" #include "kudu/util/sorted_disjoint_interval_list.h" +#include "kudu/util/stopwatch.h" #include "kudu/util/test_util_prod.h" +#include "kudu/util/threadpool.h" #include "kudu/util/trace.h" DECLARE_bool(enable_data_block_fsync); @@ -123,6 +127,11 @@ DEFINE_bool(log_block_manager_delete_dead_container, true, TAG_FLAG(log_block_manager_delete_dead_container, advanced); TAG_FLAG(log_block_manager_delete_dead_container, experimental); +DEFINE_int32(log_container_metadata_rewrite_inject_latency_ms, 0, + "Amount of latency in ms to inject when rewrite metadata file. " + "Only for testing."); +TAG_FLAG(log_container_metadata_rewrite_inject_latency_ms, hidden); + METRIC_DEFINE_gauge_uint64(server, log_block_manager_bytes_under_management, "Bytes Under Management", kudu::MetricUnit::kBytes, @@ -2119,6 +2128,10 @@ Status LogBlockManager::Open(FsReport* report, std::atomic<int>* containers_proc dd_manager_->WaitOnClosures(); // Check load errors and merge each data dir's container load results, then do repair tasks. + unique_ptr<ThreadPool> repair_pool; + RETURN_NOT_OK(ThreadPoolBuilder("repair_pool") + .set_max_threads(dd_manager_->dirs().size()) + .Build(&repair_pool)); vector<unique_ptr<internal::LogBlockContainerLoadResult>> dir_results( dd_manager_->dirs().size()); for (int i = 0; i < dd_manager_->dirs().size(); ++i) { @@ -2162,12 +2175,13 @@ Status LogBlockManager::Open(FsReport* report, std::atomic<int>* containers_proc dir_results[i] = std::move(dir_result); auto* dd_raw = dd.get(); auto* dr = dir_results[i].get(); - dd->ExecClosure([this, dd_raw, dr]() { this->RepairTask(dd_raw, dr); }); + CHECK_OK(repair_pool->Submit([this, dd_raw, dr]() { this->RepairTask(dd_raw, dr); })); } } // Wait for the repair tasks to complete. - dd_manager_->WaitOnClosures(); + repair_pool->Wait(); + repair_pool->Shutdown(); FsReport merged_report; for (int i = 0; i < dd_manager_->dirs().size(); ++i) { @@ -2970,33 +2984,55 @@ Status LogBlockManager::Repair( // "Compact" metadata files with few live blocks by rewriting them with only // the live block records. - int64_t metadata_files_compacted = 0; - int64_t metadata_bytes_delta = 0; + std::atomic<int64_t> metadata_files_compacted = 0; + std::atomic<int64_t> metadata_bytes_delta = 0; + std::atomic<bool> seen_fatal_error = false; + Status first_fatal_error; + SCOPED_LOG_TIMING(INFO, "loading block containers with low live blocks"); for (const auto& e : low_live_block_containers) { + if (seen_fatal_error.load()) { + break; + } LogBlockContainerRefPtr container = FindPtrOrNull(containers_by_name, e.first); if (!container) { // The container was deleted outright. continue; } - // Rewrite this metadata file. Failures are non-fatal. - int64_t file_bytes_delta; - const auto& meta_path = StrCat(e.first, kContainerMetadataFileSuffix); - Status s = RewriteMetadataFile(*(container.get()), e.second, &file_bytes_delta); - if (!s.ok()) { - WARN_NOT_OK(s, "could not rewrite metadata file"); - continue; - } + dir->ExecClosure([this, &metadata_files_compacted, &metadata_bytes_delta, + &seen_fatal_error, &first_fatal_error, e, container]() { + // Rewrite this metadata file. + int64_t file_bytes_delta; + const auto& meta_path = StrCat(e.first, kContainerMetadataFileSuffix); + Status s = RewriteMetadataFile(*(container.get()), e.second, &file_bytes_delta); + if (!s.ok()) { + // Rewrite metadata file failure is non-fatal, just skip it. + WARN_NOT_OK(s, Substitute("could not rewrite metadata file $0", meta_path)); + return; + } - // However, we're hosed if we can't open the new metadata file. - RETURN_NOT_OK_PREPEND(container->ReopenMetadataWriter(), - "could not reopen new metadata file"); + // Reopen the new metadata file. + s = container->ReopenMetadataWriter(); + if (!s.ok()) { + // Open the new metadata file failure is fatal, stop processing other containers. + bool current_seen_error = false; + if (seen_fatal_error.compare_exchange_strong(current_seen_error, true)) { + first_fatal_error = s.CloneAndPrepend( + Substitute("could not reopen new metadata file", meta_path)); + } + return; + } - metadata_files_compacted++; - metadata_bytes_delta += file_bytes_delta; - VLOG(1) << "Compacted metadata file " << meta_path - << " (saved " << file_bytes_delta << " bytes)"; + metadata_files_compacted++; + metadata_bytes_delta += file_bytes_delta; + VLOG(1) << "Compacted metadata file " << meta_path << " (saved " << file_bytes_delta + << " bytes)"; + }); + } + dir->WaitOnClosures(); + if (seen_fatal_error.load()) { + LOG_AND_RETURN(WARNING, first_fatal_error); } // The data directory can be synchronized once for all of the new metadata files. @@ -3009,11 +3045,11 @@ Status LogBlockManager::Repair( // TODO(awong): The below will only be true with persistent disk states. // Disk failures do not suffer from this issue because, on the next startup, // the entire directory will not be used. - if (metadata_files_compacted > 0) { + if (metadata_files_compacted.load() > 0) { Status s = env_->SyncDir(dir->dir()); RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(s, "Could not sync data directory"); LOG(INFO) << Substitute("Compacted $0 metadata files ($1 metadata bytes)", - metadata_files_compacted, metadata_bytes_delta); + metadata_files_compacted.load(), metadata_bytes_delta.load()); } return Status::OK(); @@ -3022,6 +3058,8 @@ Status LogBlockManager::Repair( Status LogBlockManager::RewriteMetadataFile(const LogBlockContainer& container, const vector<BlockRecordPB>& records, int64_t* file_bytes_delta) { + MAYBE_INJECT_FIXED_LATENCY(FLAGS_log_container_metadata_rewrite_inject_latency_ms); + const string metadata_file_name = StrCat(container.ToString(), kContainerMetadataFileSuffix); // Get the container's data directory's UUID for error handling. const string dir = container.data_dir()->dir();
