This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new c6a770e [util] extra test for MaintenanceManager
c6a770e is described below
commit c6a770ee54d5d5ef0861594476b40bb14cfe0623
Author: Alexey Serbin <[email protected]>
AuthorDate: Sat Jan 9 00:25:12 2021 -0800
[util] extra test for MaintenanceManager
This patch adds a new test for MaintenanceManager to exercise scheduling
and performing many maintenance operations. In particular, the new test
scenario targets patch [1] that reducing lock contention in
MaintenanceManager::LaunchOp().
This new test scenario produced the following results when running in
RELEASE mode without and with patch [1]:
Before [1]:
spent 5369 milliseconds to process 1000 operations
After [1]:
spent 5308 milliseconds to process 1000 operations
In addition, "perf stat -r 10" and "perf stat -e 'sched:sched_*'"
reported the following:
Before [1]:
Performance counter stats for './maintenance_manager-test
--gtest_filter=*ManyOperationsHeavyUp
dateStats*' (10 runs):
6055.786042 task-clock # 0.997 CPUs utilized
( +- 0.07% )
5,900 context-switches # 0.974 K/sec
( +- 0.95% )
430 cpu-migrations # 0.071 K/sec
( +- 30.11% )
7,023 page-faults # 0.001 M/sec
( +- 0.93% )
19,665,826,555 cycles # 3.247 GHz
( +- 0.06% )
24,194,472,924 instructions # 1.23 insns per cycle
( +- 0.08% )
4,673,919,633 branches # 771.811 M/sec
( +- 0.08% )
4,388,734 branch-misses # 0.09% of all branches
( +- 2.42% )
4,362 sched:sched_wakeup
( +- 4.38% )
968 sched:sched_switch
( +- 21.21% )
803 sched:sched_migrate_task
( +- 18.48% )
1,189 sched:sched_stat_wait
( +- 22.66% )
4,360 sched:sched_stat_sleep
( +- 4.38% )
12,268 sched:sched_stat_runtime
( +- 2.85% )
========================================================================
After [1]:
Performance counter stats for './maintenance_manager-test
--gtest_filter=*ManyOperationsHeavyUp
dateStats*' (10 runs):
6018.328330 task-clock # 0.991 CPUs utilized
( +- 0.05% )
2,216 context-switches # 0.368 K/sec
( +- 0.14% )
18 cpu-migrations # 0.003 K/sec
( +- 4.30% )
7,136 page-faults # 0.001 M/sec
( +- 0.35% )
19,622,200,306 cycles # 3.260 GHz
( +- 0.15% )
24,223,357,126 instructions # 1.23 insns per cycle
( +- 0.17% )
4,682,684,506 branches # 778.071 M/sec
( +- 0.17% )
3,618,985 branch-misses # 0.08% of all branches
( +- 2.16% )
1,035 sched:sched_wakeup
( +- 0.18% )
1 sched:sched_switch
( +- 45.94% )
29 sched:sched_migrate_task
( +- 2.14% )
24 sched:sched_stat_wait
( +- 12.13% )
1,033 sched:sched_stat_sleep
( +- 0.16% )
8,269 sched:sched_stat_runtime
( +- 0.13% )
========================================================================
[1] https://gerrit.cloudera.org/#/c/16934/
Change-Id: Ia9e71b213583c000d4809dcfc885c1d31b3bb9d5
Reviewed-on: http://gerrit.cloudera.org:8080/16937
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <[email protected]>
---
src/kudu/util/maintenance_manager-test.cc | 344 +++++++++++++++++++++++-------
src/kudu/util/maintenance_manager.h | 1 +
2 files changed, 270 insertions(+), 75 deletions(-)
diff --git a/src/kudu/util/maintenance_manager-test.cc
b/src/kudu/util/maintenance_manager-test.cc
index 99a8b39..428e833 100644
--- a/src/kudu/util/maintenance_manager-test.cc
+++ b/src/kudu/util/maintenance_manager-test.cc
@@ -20,6 +20,7 @@
#include <algorithm>
#include <atomic>
#include <cmath>
+#include <cstddef>
#include <cstdint>
#include <functional>
#include <list>
@@ -29,6 +30,7 @@
#include <string>
#include <thread>
#include <utility>
+#include <vector>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
@@ -37,10 +39,11 @@
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/countdown_latch.h"
+#include "kudu/util/locks.h"
#include "kudu/util/maintenance_manager.pb.h"
+#include "kudu/util/maintenance_manager_metrics.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
-#include "kudu/util/mutex.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
@@ -49,6 +52,8 @@ using std::list;
using std::shared_ptr;
using std::string;
using std::thread;
+using std::unique_ptr;
+using std::vector;
using strings::Substitute;
METRIC_DEFINE_entity(test);
@@ -73,51 +78,6 @@ namespace kudu {
static const int kHistorySize = 10;
static const char kFakeUuid[] = "12345";
-class MaintenanceManagerTest : public KuduTest {
- public:
- MaintenanceManagerTest()
- : metric_entity_(METRIC_ENTITY_server.Instantiate(
- &metric_registry_, "test_entity")) {
- }
-
- void SetUp() override {
- StartManager(2);
- }
-
- void TearDown() override {
- StopManager();
- }
-
- void StartManager(int32_t num_threads) {
- MaintenanceManager::Options options;
- options.num_threads = num_threads;
- options.polling_interval_ms = 1;
- options.history_size = kHistorySize;
- manager_.reset(new MaintenanceManager(options, kFakeUuid, metric_entity_));
- manager_->set_memory_pressure_func_for_tests(
- [&](double* /* consumption */) {
- return indicate_memory_pressure_.load();
- });
- ASSERT_OK(manager_->Start());
- }
-
- void StopManager() {
- manager_->Shutdown();
- }
-
- protected:
- MetricRegistry metric_registry_;
- scoped_refptr<MetricEntity> metric_entity_;
-
- shared_ptr<MaintenanceManager> manager_;
- std::atomic<bool> indicate_memory_pressure_ { false };
-};
-
-// Just create the MaintenanceManager and then shut it down, to make sure
-// there are no race conditions there.
-TEST_F(MaintenanceManagerTest, TestCreateAndShutdown) {
-}
-
class TestMaintenanceOp : public MaintenanceOp {
public:
TestMaintenanceOp(const std::string& name,
@@ -137,14 +97,16 @@ class TestMaintenanceOp : public MaintenanceOp {
remaining_runs_(1),
prepared_runs_(0),
sleep_time_(MonoDelta::FromSeconds(0)),
+ update_stats_time_(MonoDelta::FromSeconds(0)),
priority_(priority),
- workload_score_(0) {
+ workload_score_(0),
+ update_stats_count_(0) {
}
~TestMaintenanceOp() override = default;
bool Prepare() override {
- std::lock_guard<Mutex> guard(lock_);
+ std::lock_guard<simple_spinlock> guard(lock_);
if (remaining_runs_ == 0) {
return false;
}
@@ -156,7 +118,7 @@ class TestMaintenanceOp : public MaintenanceOp {
void Perform() override {
{
- std::lock_guard<Mutex> guard(lock_);
+ std::lock_guard<simple_spinlock> guard(lock_);
DLOG(INFO) << "Performing op " << name();
// Ensure that we don't call Perform() more times than we returned
@@ -166,6 +128,11 @@ class TestMaintenanceOp : public MaintenanceOp {
}
SleepFor(sleep_time_);
+
+ {
+ std::lock_guard<simple_spinlock> guard(lock_);
+ completed_at_ = MonoTime::Now();
+ }
}
void UpdateStats(MaintenanceOpStats* stats) override {
@@ -173,41 +140,57 @@ class TestMaintenanceOp : public MaintenanceOp {
start_stats_latch_->CountDown();
DCHECK_NOTNULL(continue_stats_latch_)->Wait();
}
- std::lock_guard<Mutex> guard(lock_);
+
+ if (update_stats_time_.ToNanoseconds() > 0) {
+ const auto run_until = MonoTime::Now() + update_stats_time_;
+ volatile size_t cnt = 0;
+ while (MonoTime::Now() < run_until) {
+ ++cnt;
+ }
+ }
+
+ std::lock_guard<simple_spinlock> guard(lock_);
stats->set_runnable(remaining_runs_ > 0);
stats->set_ram_anchored(ram_anchored_);
stats->set_logs_retained_bytes(logs_retained_bytes_);
stats->set_perf_improvement(perf_improvement_);
stats->set_workload_score(workload_score_);
+
+ ++update_stats_count_;
}
void set_remaining_runs(int runs) {
- std::lock_guard<Mutex> guard(lock_);
+ std::lock_guard<simple_spinlock> guard(lock_);
remaining_runs_ = runs;
}
void set_sleep_time(MonoDelta time) {
- std::lock_guard<Mutex> guard(lock_);
+ std::lock_guard<simple_spinlock> guard(lock_);
sleep_time_ = time;
}
+ void set_update_stats_time(MonoDelta time) {
+ std::lock_guard<simple_spinlock> guard(lock_);
+ update_stats_time_ = time;
+ }
+
void set_ram_anchored(uint64_t ram_anchored) {
- std::lock_guard<Mutex> guard(lock_);
+ std::lock_guard<simple_spinlock> guard(lock_);
ram_anchored_ = ram_anchored;
}
void set_logs_retained_bytes(uint64_t logs_retained_bytes) {
- std::lock_guard<Mutex> guard(lock_);
+ std::lock_guard<simple_spinlock> guard(lock_);
logs_retained_bytes_ = logs_retained_bytes;
}
void set_perf_improvement(uint64_t perf_improvement) {
- std::lock_guard<Mutex> guard(lock_);
+ std::lock_guard<simple_spinlock> guard(lock_);
perf_improvement_ = perf_improvement;
}
void set_workload_score(uint64_t workload_score) {
- std::lock_guard<Mutex> guard(lock_);
+ std::lock_guard<simple_spinlock> guard(lock_);
workload_score_ = workload_score;
}
@@ -224,12 +207,22 @@ class TestMaintenanceOp : public MaintenanceOp {
}
int remaining_runs() const {
- std::lock_guard<Mutex> guard(lock_);
+ std::lock_guard<simple_spinlock> guard(lock_);
return remaining_runs_;
}
+ uint64_t update_stats_count() const {
+ std::lock_guard<simple_spinlock> guard(lock_);
+ return update_stats_count_;
+ }
+
+ MonoTime completed_at() const {
+ std::lock_guard<simple_spinlock> guard(lock_);
+ return completed_at_;
+ }
+
private:
- mutable Mutex lock_;
+ mutable simple_spinlock lock_;
// Latch used to help other threads wait for us to begin updating stats for
// this op. Another thread may wait for this latch, and once the countdown is
@@ -245,7 +238,7 @@ class TestMaintenanceOp : public MaintenanceOp {
MetricRegistry metric_registry_;
scoped_refptr<MetricEntity> metric_entity_;
scoped_refptr<Histogram> maintenance_op_duration_;
- scoped_refptr<AtomicGauge<uint32_t> > maintenance_ops_running_;
+ scoped_refptr<AtomicGauge<uint32_t>> maintenance_ops_running_;
// The number of remaining times this operation will run before disabling
// itself.
@@ -256,12 +249,82 @@ class TestMaintenanceOp : public MaintenanceOp {
// The amount of time each op invocation will sleep.
MonoDelta sleep_time_;
+ // The amount of time each UpdateStats will sleep.
+ MonoDelta update_stats_time_;
+
// Maintenance priority.
int32_t priority_;
double workload_score_;
+
+ // Number of times the 'UpdateStats()' method is called on this instance.
+ uint64_t update_stats_count_;
+
+ // Timestamp of the monotonous clock when the operation was completed.
+ MonoTime completed_at_;
};
+class MaintenanceManagerTest : public KuduTest {
+ public:
+ MaintenanceManagerTest()
+ : metric_entity_(METRIC_ENTITY_server.Instantiate(
+ &metric_registry_, "test_entity")) {
+ }
+
+ void SetUp() override {
+ StartManager(2);
+ }
+
+ void TearDown() override {
+ StopManager();
+ }
+
+ void StartManager(int32_t num_threads) {
+ MaintenanceManager::Options options;
+ options.num_threads = num_threads;
+ options.polling_interval_ms = 1;
+ options.history_size = kHistorySize;
+ manager_.reset(new MaintenanceManager(options, kFakeUuid, metric_entity_));
+ manager_->set_memory_pressure_func_for_tests(
+ [&](double* /* consumption */) {
+ return indicate_memory_pressure_.load();
+ });
+ ASSERT_OK(manager_->Start());
+ }
+
+ void StopManager() {
+ manager_->Shutdown();
+ }
+
+ void WaitForSchedulerThreadRunning(const string& op_name) {
+ // Register an op whose sole purpose is to make sure the MM scheduler
+ // thread is running.
+ TestMaintenanceOp canary_op(op_name, MaintenanceOp::HIGH_IO_USAGE, 0);
+ canary_op.set_perf_improvement(1);
+ manager_->RegisterOp(&canary_op);
+ // Unregister the 'canary_op' operation if it goes out of scope to avoid
+ // de-referencing invalid pointers.
+ SCOPED_CLEANUP({
+ manager_->UnregisterOp(&canary_op);
+ });
+ ASSERT_EVENTUALLY([&]() {
+ ASSERT_EQ(0, canary_op.remaining_runs());
+ });
+ }
+
+ protected:
+ MetricRegistry metric_registry_;
+ scoped_refptr<MetricEntity> metric_entity_;
+
+ shared_ptr<MaintenanceManager> manager_;
+ std::atomic<bool> indicate_memory_pressure_ { false };
+};
+
+// Just create the MaintenanceManager and then shut it down, to make sure
+// there are no race conditions there.
+TEST_F(MaintenanceManagerTest, TestCreateAndShutdown) {
+}
+
// Create an op and wait for it to start running. Unregister it while it is
// running and verify that UnregisterOp waits for it to finish before
// proceeding.
@@ -557,27 +620,15 @@ TEST_F(MaintenanceManagerTest, TestPriorityOpLaunch) {
StopManager();
StartManager(1);
- // Register an op whose sole purpose is to allow us to delay the rest of this
- // test until we know that the MM scheduler thread is running.
- TestMaintenanceOp early_op("early", MaintenanceOp::HIGH_IO_USAGE, 0);
- early_op.set_perf_improvement(1);
- manager_->RegisterOp(&early_op);
- // From this point forward if an ASSERT fires, we'll hit a CHECK failure if
- // we don't unregister an op before it goes out of scope.
- SCOPED_CLEANUP({
- manager_->UnregisterOp(&early_op);
- });
- ASSERT_EVENTUALLY([&]() {
- ASSERT_EQ(0, early_op.remaining_runs());
- });
+ NO_FATALS(WaitForSchedulerThreadRunning("canary"));
// The MM scheduler thread is now running. It is now safe to use
// FLAGS_enable_maintenance_manager to temporarily disable the MM, thus
// allowing us to register a group of ops "atomically" and ensuring the op
// execution order that this test wants to see.
//
- // Without the "early op" hack above, there's a small chance that the MM
- // scheduler thread will not have run at all at the time of
+ // Without the WaitForSchedulerThreadRunning() above, there's a small chance
+ // that the MM scheduler thread will not have run at all at the time of
// FLAGS_enable_maintenance_manager = false, which would cause the thread
// to exit entirely instead of sleeping.
@@ -675,7 +726,7 @@ TEST_F(MaintenanceManagerTest, TestPriorityOpLaunch) {
"op5",
"op6",
"op7",
- "early"});
+ "canary"});
ASSERT_EQ(ordered_ops.size(), status_pb.completed_operations().size());
for (const auto& instance : status_pb.completed_operations()) {
ASSERT_EQ(ordered_ops.front(), instance.name());
@@ -683,4 +734,147 @@ TEST_F(MaintenanceManagerTest, TestPriorityOpLaunch) {
}
}
+// Check for MaintenanceManager metrics.
+TEST_F(MaintenanceManagerTest, VerifyMetrics) {
+ // Nothing has failed so far.
+ ASSERT_EQ(0, manager_->metrics_.op_prepare_failed->value());
+
+ // The oppf's Prepare() returns 'false', meaning the operation failed to
+ // prepare. However, the scores for this operation is set higher than the
+ // other two to make sure the scheduler starts working on this task before
+ // the other two.
+ class PrepareFailedMaintenanceOp : public TestMaintenanceOp {
+ public:
+ PrepareFailedMaintenanceOp()
+ : TestMaintenanceOp("oppf", MaintenanceOp::HIGH_IO_USAGE) {
+ }
+
+ bool Prepare() override {
+ set_remaining_runs(0);
+ return false;
+ }
+ } oppf;
+ oppf.set_perf_improvement(3);
+ oppf.set_workload_score(3);
+
+ TestMaintenanceOp op0("op0", MaintenanceOp::HIGH_IO_USAGE);
+ op0.set_perf_improvement(2);
+ op0.set_workload_score(2);
+ op0.set_update_stats_time(MonoDelta::FromMicroseconds(10000));
+
+ TestMaintenanceOp op1("op1", MaintenanceOp::HIGH_IO_USAGE);
+ op1.set_perf_improvement(1);
+ op1.set_workload_score(1);
+
+ manager_->RegisterOp(&oppf);
+ manager_->RegisterOp(&op0);
+ manager_->RegisterOp(&op1);
+ SCOPED_CLEANUP({
+ manager_->UnregisterOp(&op1);
+ manager_->UnregisterOp(&op0);
+ manager_->UnregisterOp(&oppf);
+ });
+
+ ASSERT_EVENTUALLY([&]() {
+ MaintenanceManagerStatusPB status_pb;
+ manager_->GetMaintenanceManagerStatusDump(&status_pb);
+ // Only 2 operations should successfully complete so far: op0, op1.
+ // Operation oppf should fail during Prepare().
+ ASSERT_EQ(2, status_pb.completed_operations_size());
+ });
+
+ {
+ // A sanity check: no operations should be running.
+ MaintenanceManagerStatusPB status_pb;
+ manager_->GetMaintenanceManagerStatusDump(&status_pb);
+ ASSERT_EQ(0, status_pb.running_operations_size());
+
+ ASSERT_EQ(1, op0.DurationHistogram()->TotalCount());
+ ASSERT_EQ(1, op1.DurationHistogram()->TotalCount());
+ ASSERT_EQ(0, oppf.DurationHistogram()->TotalCount());
+ }
+
+ // Exactly one operation has failed prepare: oppf.
+ ASSERT_EQ(1, manager_->metrics_.op_prepare_failed->value());
+
+ // There should be at least 3 runs of the FindBestOp() method by this time
+ // becase 3 operations have been scheduled: oppf, op0, op1,
+ // but it might be many more since the scheduler is still active.
+ ASSERT_LE(3, manager_->metrics_.op_find_duration->TotalCount());
+
+ // Max time taken by FindBestOp() should be at least 10 msec since that's
+ // the mininum duration of op0's UpdateStats().
+ ASSERT_GE(manager_->metrics_.op_find_duration->MaxValueForTests(), 10000);
+}
+
+// This test scenario verifies that maintenance manager is able to process
+// operations with high enough level of concurrency, even if their
UpdateStats()
+// method is computationally heavy.
+TEST_F(MaintenanceManagerTest, ManyOperationsHeavyUpdateStats) {
+ SKIP_IF_SLOW_NOT_ALLOWED();
+
+ StopManager();
+ StartManager(4);
+
+ NO_FATALS(WaitForSchedulerThreadRunning("canary"));
+
+ constexpr auto kOpsNum = 1000;
+ vector<unique_ptr<TestMaintenanceOp>> ops;
+ ops.reserve(kOpsNum);
+ for (auto i = 0; i < kOpsNum; ++i) {
+ unique_ptr<TestMaintenanceOp> op(new TestMaintenanceOp(
+ std::to_string(i), MaintenanceOp::HIGH_IO_USAGE));
+ op->set_perf_improvement(i + 1);
+ op->set_workload_score(i + 1);
+ op->set_remaining_runs(1);
+ op->set_sleep_time(MonoDelta::FromMilliseconds(i % 8 + 1));
+ op->set_update_stats_time(MonoDelta::FromMicroseconds(5));
+ ops.emplace_back(std::move(op));
+ }
+
+ // For cleaner timings, disable processing of the registered operations,
+ // and re-enable that after registering all operations.
+ FLAGS_enable_maintenance_manager = false;
+ for (auto& op : ops) {
+ manager_->RegisterOp(op.get());
+ }
+ FLAGS_enable_maintenance_manager = true;
+ MonoTime time_started = MonoTime::Now();
+
+ SCOPED_CLEANUP({
+ for (auto& op : ops) {
+ manager_->UnregisterOp(op.get());
+ }
+ });
+
+ // Given the performance improvement scores and workload scores assigned
+ // to the maintenance operations, the operation scheduled first should
+ // be processed last. Once it's done, no other operations should be running.
+ AssertEventually([&] {
+ ASSERT_EQ(1, ops.front()->DurationHistogram()->TotalCount());
+ }, MonoDelta::FromSeconds(60));
+
+ // A sanity check: verify no operations are left running, but all are still
+ // registered.
+ {
+ MaintenanceManagerStatusPB status_pb;
+ manager_->GetMaintenanceManagerStatusDump(&status_pb);
+ ASSERT_EQ(0, status_pb.running_operations_size());
+ ASSERT_EQ(kOpsNum, status_pb.registered_operations_size());
+ }
+
+ MonoTime time_completed = time_started;
+ for (const auto& op : ops) {
+ const auto op_time_completed = op->completed_at();
+ if (op_time_completed > time_completed) {
+ time_completed = op_time_completed;
+ }
+ }
+ const auto time_spent = time_completed - time_started;
+ LOG(INFO) << Substitute("spent $0 milliseconds to process $1 operations",
+ time_spent.ToMilliseconds(), kOpsNum);
+ LOG(INFO) << Substitute("number of UpdateStats() calls per operation: $0",
+ ops.front()->update_stats_count());
+}
+
} // namespace kudu
diff --git a/src/kudu/util/maintenance_manager.h
b/src/kudu/util/maintenance_manager.h
index f14c498..2e1f48b 100644
--- a/src/kudu/util/maintenance_manager.h
+++ b/src/kudu/util/maintenance_manager.h
@@ -340,6 +340,7 @@ class MaintenanceManager : public
std::enable_shared_from_this<MaintenanceManage
FRIEND_TEST(MaintenanceManagerTest, TestLogRetentionPrioritization);
FRIEND_TEST(MaintenanceManagerTest,
TestPrioritizeLogRetentionUnderMemoryPressure);
FRIEND_TEST(MaintenanceManagerTest, TestOpFactors);
+ FRIEND_TEST(MaintenanceManagerTest, VerifyMetrics);
typedef std::map<MaintenanceOp*, MaintenanceOpStats,
MaintenanceOpComparator> OpMapType;