This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new c6a770e  [util] extra test for MaintenanceManager
c6a770e is described below

commit c6a770ee54d5d5ef0861594476b40bb14cfe0623
Author: Alexey Serbin <[email protected]>
AuthorDate: Sat Jan 9 00:25:12 2021 -0800

    [util] extra test for MaintenanceManager
    
    This patch adds a new test for MaintenanceManager to exercise scheduling
    and performing many maintenance operations.  In particular, the new test
    scenario targets patch [1] that reducing lock contention in
    MaintenanceManager::LaunchOp().
    
    This new test scenario produced the following results when running in
    RELEASE mode without and with patch [1]:
    
    Before [1]:
      spent 5369 milliseconds to process 1000 operations
    After  [1]:
      spent 5308 milliseconds to process 1000 operations
    
    In addition, "perf stat -r 10" and "perf stat -e 'sched:sched_*'"
    reported the following:
    
    Before [1]:
      Performance counter stats for './maintenance_manager-test 
--gtest_filter=*ManyOperationsHeavyUp
    dateStats*' (10 runs):
    
           6055.786042 task-clock                #    0.997 CPUs utilized       
     ( +-  0.07% )
                 5,900 context-switches          #    0.974 K/sec               
     ( +-  0.95% )
                   430 cpu-migrations            #    0.071 K/sec               
     ( +- 30.11% )
                 7,023 page-faults               #    0.001 M/sec               
     ( +-  0.93% )
        19,665,826,555 cycles                    #    3.247 GHz                 
     ( +-  0.06% )
        24,194,472,924 instructions              #    1.23  insns per cycle     
     ( +-  0.08% )
         4,673,919,633 branches                  #  771.811 M/sec               
     ( +-  0.08% )
             4,388,734 branch-misses             #    0.09% of all branches     
     ( +-  2.42% )
    
                 4,362 sched:sched_wakeup                                       
     ( +-  4.38% )
                   968 sched:sched_switch                                       
     ( +- 21.21% )
                   803 sched:sched_migrate_task                                 
     ( +- 18.48% )
                 1,189 sched:sched_stat_wait                                    
     ( +- 22.66% )
                 4,360 sched:sched_stat_sleep                                   
     ( +-  4.38% )
                12,268 sched:sched_stat_runtime                                 
     ( +-  2.85% )
    
    ========================================================================
    
    After  [1]:
      Performance counter stats for './maintenance_manager-test 
--gtest_filter=*ManyOperationsHeavyUp
    dateStats*' (10 runs):
    
           6018.328330 task-clock                #    0.991 CPUs utilized       
     ( +-  0.05% )
                 2,216 context-switches          #    0.368 K/sec               
     ( +-  0.14% )
                    18 cpu-migrations            #    0.003 K/sec               
     ( +-  4.30% )
                 7,136 page-faults               #    0.001 M/sec               
     ( +-  0.35% )
        19,622,200,306 cycles                    #    3.260 GHz                 
     ( +-  0.15% )
        24,223,357,126 instructions              #    1.23  insns per cycle     
     ( +-  0.17% )
         4,682,684,506 branches                  #  778.071 M/sec               
     ( +-  0.17% )
             3,618,985 branch-misses             #    0.08% of all branches     
     ( +-  2.16% )
    
                 1,035 sched:sched_wakeup                                       
     ( +-  0.18% )
                     1 sched:sched_switch                                       
     ( +- 45.94% )
                    29 sched:sched_migrate_task                                 
     ( +-  2.14% )
                    24 sched:sched_stat_wait                                    
     ( +- 12.13% )
                 1,033 sched:sched_stat_sleep                                   
     ( +-  0.16% )
                 8,269 sched:sched_stat_runtime                                 
     ( +-  0.13% )
    
    ========================================================================
    
    [1] https://gerrit.cloudera.org/#/c/16934/
    
    Change-Id: Ia9e71b213583c000d4809dcfc885c1d31b3bb9d5
    Reviewed-on: http://gerrit.cloudera.org:8080/16937
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <[email protected]>
---
 src/kudu/util/maintenance_manager-test.cc | 344 +++++++++++++++++++++++-------
 src/kudu/util/maintenance_manager.h       |   1 +
 2 files changed, 270 insertions(+), 75 deletions(-)

diff --git a/src/kudu/util/maintenance_manager-test.cc 
b/src/kudu/util/maintenance_manager-test.cc
index 99a8b39..428e833 100644
--- a/src/kudu/util/maintenance_manager-test.cc
+++ b/src/kudu/util/maintenance_manager-test.cc
@@ -20,6 +20,7 @@
 #include <algorithm>
 #include <atomic>
 #include <cmath>
+#include <cstddef>
 #include <cstdint>
 #include <functional>
 #include <list>
@@ -29,6 +30,7 @@
 #include <string>
 #include <thread>
 #include <utility>
+#include <vector>
 
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
@@ -37,10 +39,11 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/countdown_latch.h"
+#include "kudu/util/locks.h"
 #include "kudu/util/maintenance_manager.pb.h"
+#include "kudu/util/maintenance_manager_metrics.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
-#include "kudu/util/mutex.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
@@ -49,6 +52,8 @@ using std::list;
 using std::shared_ptr;
 using std::string;
 using std::thread;
+using std::unique_ptr;
+using std::vector;
 using strings::Substitute;
 
 METRIC_DEFINE_entity(test);
@@ -73,51 +78,6 @@ namespace kudu {
 static const int kHistorySize = 10;
 static const char kFakeUuid[] = "12345";
 
-class MaintenanceManagerTest : public KuduTest {
- public:
-  MaintenanceManagerTest()
-      : metric_entity_(METRIC_ENTITY_server.Instantiate(
-                       &metric_registry_, "test_entity")) {
-  }
-
-  void SetUp() override {
-    StartManager(2);
-  }
-
-  void TearDown() override {
-    StopManager();
-  }
-
-  void StartManager(int32_t num_threads) {
-    MaintenanceManager::Options options;
-    options.num_threads = num_threads;
-    options.polling_interval_ms = 1;
-    options.history_size = kHistorySize;
-    manager_.reset(new MaintenanceManager(options, kFakeUuid, metric_entity_));
-    manager_->set_memory_pressure_func_for_tests(
-        [&](double* /* consumption */) {
-          return indicate_memory_pressure_.load();
-        });
-    ASSERT_OK(manager_->Start());
-  }
-
-  void StopManager() {
-    manager_->Shutdown();
-  }
-
- protected:
-  MetricRegistry metric_registry_;
-  scoped_refptr<MetricEntity> metric_entity_;
-
-  shared_ptr<MaintenanceManager> manager_;
-  std::atomic<bool> indicate_memory_pressure_ { false };
-};
-
-// Just create the MaintenanceManager and then shut it down, to make sure
-// there are no race conditions there.
-TEST_F(MaintenanceManagerTest, TestCreateAndShutdown) {
-}
-
 class TestMaintenanceOp : public MaintenanceOp {
  public:
   TestMaintenanceOp(const std::string& name,
@@ -137,14 +97,16 @@ class TestMaintenanceOp : public MaintenanceOp {
       remaining_runs_(1),
       prepared_runs_(0),
       sleep_time_(MonoDelta::FromSeconds(0)),
+      update_stats_time_(MonoDelta::FromSeconds(0)),
       priority_(priority),
-      workload_score_(0) {
+      workload_score_(0),
+      update_stats_count_(0) {
   }
 
   ~TestMaintenanceOp() override = default;
 
   bool Prepare() override {
-    std::lock_guard<Mutex> guard(lock_);
+    std::lock_guard<simple_spinlock> guard(lock_);
     if (remaining_runs_ == 0) {
       return false;
     }
@@ -156,7 +118,7 @@ class TestMaintenanceOp : public MaintenanceOp {
 
   void Perform() override {
     {
-      std::lock_guard<Mutex> guard(lock_);
+      std::lock_guard<simple_spinlock> guard(lock_);
       DLOG(INFO) << "Performing op " << name();
 
       // Ensure that we don't call Perform() more times than we returned
@@ -166,6 +128,11 @@ class TestMaintenanceOp : public MaintenanceOp {
     }
 
     SleepFor(sleep_time_);
+
+    {
+      std::lock_guard<simple_spinlock> guard(lock_);
+      completed_at_ = MonoTime::Now();
+    }
   }
 
   void UpdateStats(MaintenanceOpStats* stats) override {
@@ -173,41 +140,57 @@ class TestMaintenanceOp : public MaintenanceOp {
       start_stats_latch_->CountDown();
       DCHECK_NOTNULL(continue_stats_latch_)->Wait();
     }
-    std::lock_guard<Mutex> guard(lock_);
+
+    if (update_stats_time_.ToNanoseconds() > 0) {
+      const auto run_until = MonoTime::Now() + update_stats_time_;
+      volatile size_t cnt = 0;
+      while (MonoTime::Now() < run_until) {
+        ++cnt;
+      }
+    }
+
+    std::lock_guard<simple_spinlock> guard(lock_);
     stats->set_runnable(remaining_runs_ > 0);
     stats->set_ram_anchored(ram_anchored_);
     stats->set_logs_retained_bytes(logs_retained_bytes_);
     stats->set_perf_improvement(perf_improvement_);
     stats->set_workload_score(workload_score_);
+
+    ++update_stats_count_;
   }
 
   void set_remaining_runs(int runs) {
-    std::lock_guard<Mutex> guard(lock_);
+    std::lock_guard<simple_spinlock> guard(lock_);
     remaining_runs_ = runs;
   }
 
   void set_sleep_time(MonoDelta time) {
-    std::lock_guard<Mutex> guard(lock_);
+    std::lock_guard<simple_spinlock> guard(lock_);
     sleep_time_ = time;
   }
 
+  void set_update_stats_time(MonoDelta time) {
+    std::lock_guard<simple_spinlock> guard(lock_);
+    update_stats_time_ = time;
+  }
+
   void set_ram_anchored(uint64_t ram_anchored) {
-    std::lock_guard<Mutex> guard(lock_);
+    std::lock_guard<simple_spinlock> guard(lock_);
     ram_anchored_ = ram_anchored;
   }
 
   void set_logs_retained_bytes(uint64_t logs_retained_bytes) {
-    std::lock_guard<Mutex> guard(lock_);
+    std::lock_guard<simple_spinlock> guard(lock_);
     logs_retained_bytes_ = logs_retained_bytes;
   }
 
   void set_perf_improvement(uint64_t perf_improvement) {
-    std::lock_guard<Mutex> guard(lock_);
+    std::lock_guard<simple_spinlock> guard(lock_);
     perf_improvement_ = perf_improvement;
   }
 
   void set_workload_score(uint64_t workload_score) {
-    std::lock_guard<Mutex> guard(lock_);
+    std::lock_guard<simple_spinlock> guard(lock_);
     workload_score_ = workload_score;
   }
 
@@ -224,12 +207,22 @@ class TestMaintenanceOp : public MaintenanceOp {
   }
 
   int remaining_runs() const {
-    std::lock_guard<Mutex> guard(lock_);
+    std::lock_guard<simple_spinlock> guard(lock_);
     return remaining_runs_;
   }
 
+  uint64_t update_stats_count() const {
+    std::lock_guard<simple_spinlock> guard(lock_);
+    return update_stats_count_;
+  }
+
+  MonoTime completed_at() const {
+    std::lock_guard<simple_spinlock> guard(lock_);
+    return completed_at_;
+  }
+
  private:
-  mutable Mutex lock_;
+  mutable simple_spinlock lock_;
 
   // Latch used to help other threads wait for us to begin updating stats for
   // this op. Another thread may wait for this latch, and once the countdown is
@@ -245,7 +238,7 @@ class TestMaintenanceOp : public MaintenanceOp {
   MetricRegistry metric_registry_;
   scoped_refptr<MetricEntity> metric_entity_;
   scoped_refptr<Histogram> maintenance_op_duration_;
-  scoped_refptr<AtomicGauge<uint32_t> > maintenance_ops_running_;
+  scoped_refptr<AtomicGauge<uint32_t>> maintenance_ops_running_;
 
   // The number of remaining times this operation will run before disabling
   // itself.
@@ -256,12 +249,82 @@ class TestMaintenanceOp : public MaintenanceOp {
   // The amount of time each op invocation will sleep.
   MonoDelta sleep_time_;
 
+  // The amount of time each UpdateStats will sleep.
+  MonoDelta update_stats_time_;
+
   // Maintenance priority.
   int32_t priority_;
 
   double workload_score_;
+
+  // Number of times the 'UpdateStats()' method is called on this instance.
+  uint64_t update_stats_count_;
+
+  // Timestamp of the monotonous clock when the operation was completed.
+  MonoTime completed_at_;
 };
 
+class MaintenanceManagerTest : public KuduTest {
+ public:
+  MaintenanceManagerTest()
+      : metric_entity_(METRIC_ENTITY_server.Instantiate(
+                       &metric_registry_, "test_entity")) {
+  }
+
+  void SetUp() override {
+    StartManager(2);
+  }
+
+  void TearDown() override {
+    StopManager();
+  }
+
+  void StartManager(int32_t num_threads) {
+    MaintenanceManager::Options options;
+    options.num_threads = num_threads;
+    options.polling_interval_ms = 1;
+    options.history_size = kHistorySize;
+    manager_.reset(new MaintenanceManager(options, kFakeUuid, metric_entity_));
+    manager_->set_memory_pressure_func_for_tests(
+        [&](double* /* consumption */) {
+          return indicate_memory_pressure_.load();
+        });
+    ASSERT_OK(manager_->Start());
+  }
+
+  void StopManager() {
+    manager_->Shutdown();
+  }
+
+  void WaitForSchedulerThreadRunning(const string& op_name) {
+    // Register an op whose sole purpose is to make sure the MM scheduler
+    // thread is running.
+    TestMaintenanceOp canary_op(op_name, MaintenanceOp::HIGH_IO_USAGE, 0);
+    canary_op.set_perf_improvement(1);
+    manager_->RegisterOp(&canary_op);
+    // Unregister the 'canary_op' operation if it goes out of scope to avoid
+    // de-referencing invalid pointers.
+    SCOPED_CLEANUP({
+      manager_->UnregisterOp(&canary_op);
+    });
+    ASSERT_EVENTUALLY([&]() {
+      ASSERT_EQ(0, canary_op.remaining_runs());
+    });
+  }
+
+ protected:
+  MetricRegistry metric_registry_;
+  scoped_refptr<MetricEntity> metric_entity_;
+
+  shared_ptr<MaintenanceManager> manager_;
+  std::atomic<bool> indicate_memory_pressure_ { false };
+};
+
+// Just create the MaintenanceManager and then shut it down, to make sure
+// there are no race conditions there.
+TEST_F(MaintenanceManagerTest, TestCreateAndShutdown) {
+}
+
 // Create an op and wait for it to start running.  Unregister it while it is
 // running and verify that UnregisterOp waits for it to finish before
 // proceeding.
@@ -557,27 +620,15 @@ TEST_F(MaintenanceManagerTest, TestPriorityOpLaunch) {
   StopManager();
   StartManager(1);
 
-  // Register an op whose sole purpose is to allow us to delay the rest of this
-  // test until we know that the MM scheduler thread is running.
-  TestMaintenanceOp early_op("early", MaintenanceOp::HIGH_IO_USAGE, 0);
-  early_op.set_perf_improvement(1);
-  manager_->RegisterOp(&early_op);
-  // From this point forward if an ASSERT fires, we'll hit a CHECK failure if
-  // we don't unregister an op before it goes out of scope.
-  SCOPED_CLEANUP({
-    manager_->UnregisterOp(&early_op);
-  });
-  ASSERT_EVENTUALLY([&]() {
-    ASSERT_EQ(0, early_op.remaining_runs());
-  });
+  NO_FATALS(WaitForSchedulerThreadRunning("canary"));
 
   // The MM scheduler thread is now running. It is now safe to use
   // FLAGS_enable_maintenance_manager to temporarily disable the MM, thus
   // allowing us to register a group of ops "atomically" and ensuring the op
   // execution order that this test wants to see.
   //
-  // Without the "early op" hack above, there's a small chance that the MM
-  // scheduler thread will not have run at all at the time of
+  // Without the WaitForSchedulerThreadRunning() above, there's a small chance
+  // that the MM scheduler thread will not have run at all at the time of
   // FLAGS_enable_maintenance_manager = false, which would cause the thread
   // to exit entirely instead of sleeping.
 
@@ -675,7 +726,7 @@ TEST_F(MaintenanceManagerTest, TestPriorityOpLaunch) {
                             "op5",
                             "op6",
                             "op7",
-                            "early"});
+                            "canary"});
   ASSERT_EQ(ordered_ops.size(), status_pb.completed_operations().size());
   for (const auto& instance : status_pb.completed_operations()) {
     ASSERT_EQ(ordered_ops.front(), instance.name());
@@ -683,4 +734,147 @@ TEST_F(MaintenanceManagerTest, TestPriorityOpLaunch) {
   }
 }
 
+// Check for MaintenanceManager metrics.
+TEST_F(MaintenanceManagerTest, VerifyMetrics) {
+  // Nothing has failed so far.
+  ASSERT_EQ(0, manager_->metrics_.op_prepare_failed->value());
+
+  // The oppf's Prepare() returns 'false', meaning the operation failed to
+  // prepare. However, the scores for this operation is set higher than the
+  // other two to make sure the scheduler starts working on this task before
+  // the other two.
+  class PrepareFailedMaintenanceOp : public TestMaintenanceOp {
+   public:
+    PrepareFailedMaintenanceOp()
+        : TestMaintenanceOp("oppf", MaintenanceOp::HIGH_IO_USAGE) {
+    }
+
+    bool Prepare() override {
+      set_remaining_runs(0);
+      return false;
+    }
+  } oppf;
+  oppf.set_perf_improvement(3);
+  oppf.set_workload_score(3);
+
+  TestMaintenanceOp op0("op0", MaintenanceOp::HIGH_IO_USAGE);
+  op0.set_perf_improvement(2);
+  op0.set_workload_score(2);
+  op0.set_update_stats_time(MonoDelta::FromMicroseconds(10000));
+
+  TestMaintenanceOp op1("op1", MaintenanceOp::HIGH_IO_USAGE);
+  op1.set_perf_improvement(1);
+  op1.set_workload_score(1);
+
+  manager_->RegisterOp(&oppf);
+  manager_->RegisterOp(&op0);
+  manager_->RegisterOp(&op1);
+  SCOPED_CLEANUP({
+    manager_->UnregisterOp(&op1);
+    manager_->UnregisterOp(&op0);
+    manager_->UnregisterOp(&oppf);
+  });
+
+  ASSERT_EVENTUALLY([&]() {
+    MaintenanceManagerStatusPB status_pb;
+    manager_->GetMaintenanceManagerStatusDump(&status_pb);
+    // Only 2 operations should successfully complete so far: op0, op1.
+    // Operation oppf should fail during Prepare().
+    ASSERT_EQ(2, status_pb.completed_operations_size());
+  });
+
+  {
+    // A sanity check: no operations should be running.
+    MaintenanceManagerStatusPB status_pb;
+    manager_->GetMaintenanceManagerStatusDump(&status_pb);
+    ASSERT_EQ(0, status_pb.running_operations_size());
+
+    ASSERT_EQ(1, op0.DurationHistogram()->TotalCount());
+    ASSERT_EQ(1, op1.DurationHistogram()->TotalCount());
+    ASSERT_EQ(0, oppf.DurationHistogram()->TotalCount());
+  }
+
+  // Exactly one operation has failed prepare: oppf.
+  ASSERT_EQ(1, manager_->metrics_.op_prepare_failed->value());
+
+  // There should be at least 3 runs of the FindBestOp() method by this time
+  // becase 3 operations have been scheduled: oppf, op0, op1,
+  // but it might be many more since the scheduler is still active.
+  ASSERT_LE(3, manager_->metrics_.op_find_duration->TotalCount());
+
+  // Max time taken by FindBestOp() should be at least 10 msec since that's
+  // the mininum duration of op0's UpdateStats().
+  ASSERT_GE(manager_->metrics_.op_find_duration->MaxValueForTests(), 10000);
+}
+
+// This test scenario verifies that maintenance manager is able to process
+// operations with high enough level of concurrency, even if their 
UpdateStats()
+// method is computationally heavy.
+TEST_F(MaintenanceManagerTest, ManyOperationsHeavyUpdateStats) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  StopManager();
+  StartManager(4);
+
+  NO_FATALS(WaitForSchedulerThreadRunning("canary"));
+
+  constexpr auto kOpsNum = 1000;
+  vector<unique_ptr<TestMaintenanceOp>> ops;
+  ops.reserve(kOpsNum);
+  for (auto i = 0; i < kOpsNum; ++i) {
+    unique_ptr<TestMaintenanceOp> op(new TestMaintenanceOp(
+        std::to_string(i), MaintenanceOp::HIGH_IO_USAGE));
+    op->set_perf_improvement(i + 1);
+    op->set_workload_score(i + 1);
+    op->set_remaining_runs(1);
+    op->set_sleep_time(MonoDelta::FromMilliseconds(i % 8 + 1));
+    op->set_update_stats_time(MonoDelta::FromMicroseconds(5));
+    ops.emplace_back(std::move(op));
+  }
+
+  // For cleaner timings, disable processing of the registered operations,
+  // and re-enable that after registering all operations.
+  FLAGS_enable_maintenance_manager = false;
+  for (auto& op : ops) {
+    manager_->RegisterOp(op.get());
+  }
+  FLAGS_enable_maintenance_manager = true;
+  MonoTime time_started = MonoTime::Now();
+
+  SCOPED_CLEANUP({
+    for (auto& op : ops) {
+      manager_->UnregisterOp(op.get());
+    }
+  });
+
+  // Given the performance improvement scores and workload scores assigned
+  // to the maintenance operations, the operation scheduled first should
+  // be processed last. Once it's done, no other operations should be running.
+  AssertEventually([&] {
+    ASSERT_EQ(1, ops.front()->DurationHistogram()->TotalCount());
+  }, MonoDelta::FromSeconds(60));
+
+  // A sanity check: verify no operations are left running, but all are still
+  // registered.
+  {
+    MaintenanceManagerStatusPB status_pb;
+    manager_->GetMaintenanceManagerStatusDump(&status_pb);
+    ASSERT_EQ(0, status_pb.running_operations_size());
+    ASSERT_EQ(kOpsNum, status_pb.registered_operations_size());
+  }
+
+  MonoTime time_completed = time_started;
+  for (const auto& op : ops) {
+    const auto op_time_completed = op->completed_at();
+    if (op_time_completed > time_completed) {
+      time_completed = op_time_completed;
+    }
+  }
+  const auto time_spent = time_completed - time_started;
+  LOG(INFO) << Substitute("spent $0 milliseconds to process $1 operations",
+                          time_spent.ToMilliseconds(), kOpsNum);
+  LOG(INFO) << Substitute("number of UpdateStats() calls per operation: $0",
+                          ops.front()->update_stats_count());
+}
+
 } // namespace kudu
diff --git a/src/kudu/util/maintenance_manager.h 
b/src/kudu/util/maintenance_manager.h
index f14c498..2e1f48b 100644
--- a/src/kudu/util/maintenance_manager.h
+++ b/src/kudu/util/maintenance_manager.h
@@ -340,6 +340,7 @@ class MaintenanceManager : public 
std::enable_shared_from_this<MaintenanceManage
   FRIEND_TEST(MaintenanceManagerTest, TestLogRetentionPrioritization);
   FRIEND_TEST(MaintenanceManagerTest, 
TestPrioritizeLogRetentionUnderMemoryPressure);
   FRIEND_TEST(MaintenanceManagerTest, TestOpFactors);
+  FRIEND_TEST(MaintenanceManagerTest, VerifyMetrics);
 
   typedef std::map<MaintenanceOp*, MaintenanceOpStats,
           MaintenanceOpComparator> OpMapType;

Reply via email to