This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 483807a  KUDU-3149: don't block op registration on MM mutex
483807a is described below

commit 483807a622bbb10558f9905e87ab0ab1b7abca35
Author: Andrew Wong <[email protected]>
AuthorDate: Sat Oct 10 20:57:09 2020 -0700

    KUDU-3149: don't block op registration on MM mutex
    
    The maintenance manager's 'lock_' is a mutex that is taken upon access
    to 'ops_', notably when iterating through 'ops_' to find the best op to
    run. This particular critical section can last quite a while, as finding
    the best op entails computing stats for each op, which is expensive for
    compactions, blocking op registration and thus tablet bootstrapping.
    
    This patch addresses the issue by buffering calls to RegisterOp() into a
    separate op map protected by a separate spinlock, and periodically
    merging the separate map into 'ops_'.
    
    I added a unit test for the maintenance manager change, and added a
    single-node test to contend several tablets' bootstrap with compactions.
    I ran this with and without op buffering (via a flag that I have removed
    from this patch) with the following results:
    
    [awong@va1022 release]$ for i in {1..5}; do ./bin/tablet_server-test 
--gtest_filter=*WithConcurrent* --num_tablets=300 --num_rowsets_per_tablet=30 
--buffer_op_registration=true |& grep "waiting for all bootstraps to finish"; 
done
    I1014 02:19:25.452993 80415 tablet_server-test.cc:4405] Time spent waiting 
for all bootstraps to finish: real 0.787s    user 0.000s     sys 0.002s
    I1014 02:19:43.039741 82020 tablet_server-test.cc:4405] Time spent waiting 
for all bootstraps to finish: real 0.641s    user 0.000s     sys 0.004s
    I1014 02:20:02.907203 83608 tablet_server-test.cc:4405] Time spent waiting 
for all bootstraps to finish: real 0.769s    user 0.001s     sys 0.002s
    I1014 02:20:21.779570 85260 tablet_server-test.cc:4405] Time spent waiting 
for all bootstraps to finish: real 0.758s    user 0.002s     sys 0.001s
    I1014 02:20:40.687155 86874 tablet_server-test.cc:4405] Time spent waiting 
for all bootstraps to finish: real 0.682s    user 0.001s     sys 0.002s
    
    Average real time with op buffering: 0.727s
    
    [awong@va1022 release]$ for i in {1..5}; do ./bin/tablet_server-test 
--gtest_filter=*WithConcurrent* --num_tablets=300 --num_rowsets_per_tablet=30 
--buffer_op_registration=false |& grep "waiting for all bootstraps to finish"; 
done
    I1014 02:21:13.666689 88559 tablet_server-test.cc:4405] Time spent waiting 
for all bootstraps to finish: real 1.538s    user 0.002s     sys 0.001s
    I1014 02:21:33.119479 90172 tablet_server-test.cc:4405] Time spent waiting 
for all bootstraps to finish: real 1.316s    user 0.001s     sys 0.002s
    I1014 02:21:53.929062 91816 tablet_server-test.cc:4405] Time spent waiting 
for all bootstraps to finish: real 1.393s    user 0.003s     sys 0.001s
    I1014 02:22:12.764689 93439 tablet_server-test.cc:4405] Time spent waiting 
for all bootstraps to finish: real 1.356s    user 0.003s     sys 0.000s
    I1014 02:22:31.138669 95042 tablet_server-test.cc:4405] Time spent waiting 
for all bootstraps to finish: real 1.516s    user 0.000s     sys 0.003s
    
    Average real time without op buffering: 1.424s
    
    Change-Id: I4a1b810f5b7ff6a22acc9b10b79ddffa8085c990
    Reviewed-on: http://gerrit.cloudera.org:8080/16580
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <[email protected]>
---
 src/kudu/tserver/tablet_server-test.cc    | 42 +++++++++++++++++++
 src/kudu/util/maintenance_manager-test.cc | 52 ++++++++++++++++++++++-
 src/kudu/util/maintenance_manager.cc      | 68 +++++++++++++++++++++----------
 src/kudu/util/maintenance_manager.h       | 31 +++++++++++---
 4 files changed, 164 insertions(+), 29 deletions(-)

diff --git a/src/kudu/tserver/tablet_server-test.cc 
b/src/kudu/tserver/tablet_server-test.cc
index eb1cd06..514ab7f 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -111,7 +111,9 @@
 #include "kudu/util/memory/arena.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
+#include "kudu/util/oid_generator.h"
 #include "kudu/util/path_util.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/scoped_cleanup.h"
@@ -165,6 +167,9 @@ 
DEFINE_int32(single_threaded_insert_latency_bench_insert_rows, 1000,
 DEFINE_int32(delete_tablet_bench_num_flushes, 200,
              "Number of disk row sets to flush in the delete tablet 
benchmark");
 
+DEFINE_int32(num_tablets, 5, "Number of tablets to create for tests");
+DEFINE_int32(num_rowsets_per_tablet, 10, "Number of rowsets to create per 
tablet");
+
 DECLARE_bool(crash_on_eio);
 DECLARE_bool(enable_flush_deltamemstores);
 DECLARE_bool(enable_flush_memrowset);
@@ -4336,6 +4341,8 @@ TEST_F(TabletServerTest, TestScannerCheckMatchingUser) {
 // older than the flush threshold, we will schedule DMS flushes.
 TEST_F(TabletServerTest, TestTimeBasedFlushDMS) {
   SKIP_IF_SLOW_NOT_ALLOWED();
+  FLAGS_enable_maintenance_manager = false;
+  NO_FATALS(ShutdownAndRebuildTablet());
   // We're going to generate a bunch of DMSs, and for that, we need multiple
   // rowsets, so disable merge compactions.
   FLAGS_enable_rowset_compaction = false;
@@ -4367,6 +4374,41 @@ TEST_F(TabletServerTest, TestTimeBasedFlushDMS) {
   ASSERT_TRUE(tablet_replica_->tablet()->DeltaMemRowSetEmpty());
 }
 
+TEST_F(TabletServerTest, StartupWithConcurrentOpsBenchmark) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+  FLAGS_enable_maintenance_manager = false;
+  NO_FATALS(ShutdownAndRebuildTablet());
+
+  // Create a bunch of tablets with a bunch of rowsets.
+  ObjectIdGenerator oid;
+  for (int i = 0; i < FLAGS_num_tablets; i++) {
+    const auto tablet_id = oid.Next();
+    ASSERT_OK(mini_server_->AddTestTablet(kTableId, tablet_id, schema_));
+    ASSERT_OK(WaitForTabletRunning(tablet_id.c_str()));
+    for (int r = 0; r < FLAGS_num_rowsets_per_tablet; r++) {
+      NO_FATALS(InsertTestRowsDirect(r, 1, tablet_id));
+      scoped_refptr<TabletReplica> replica;
+      
ASSERT_TRUE(mini_server_->server()->tablet_manager()->LookupTablet(tablet_id, 
&replica));
+      ASSERT_OK(replica->tablet()->Flush());
+    }
+  }
+  ShutdownTablet();
+  // With a large number of tablets and rowsets, the maintenance manager
+  // scheduler thread will spend a lot of time computing the best op to
+  // perform. With many maintenance threads, finding the best op may contend
+  // with on-going compactions. Per KUDU-3149, this shouldn't interfere with op
+  // registration and bootstrapping.
+  FLAGS_enable_maintenance_manager = true;
+  FLAGS_maintenance_manager_polling_interval_ms = 1;
+  FLAGS_maintenance_manager_num_threads = 10;
+  mini_server_.reset(new 
MiniTabletServer(GetTestPath("TabletServerTest-fsroot"),
+                                          HostPort("127.0.0.1", 0), 1));
+  ASSERT_OK(mini_server_->Start());
+  LOG_TIMING(INFO, "waiting for all bootstraps to finish") {
+    
ASSERT_OK(mini_server_->server()->tablet_manager()->WaitForAllBootstrapsToFinish());
+  }
+}
+
 TEST_F(TabletServerTest, TestStarvePerfImprovementOpsInColdTablet) {
   SKIP_IF_SLOW_NOT_ALLOWED();
   FLAGS_enable_maintenance_manager = true;
diff --git a/src/kudu/util/maintenance_manager-test.cc 
b/src/kudu/util/maintenance_manager-test.cc
index 84bcc7a..e43e60d 100644
--- a/src/kudu/util/maintenance_manager-test.cc
+++ b/src/kudu/util/maintenance_manager-test.cc
@@ -36,6 +36,7 @@
 
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/countdown_latch.h"
 #include "kudu/util/maintenance_manager.pb.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
@@ -113,8 +114,12 @@ class TestMaintenanceOp : public MaintenanceOp {
  public:
   TestMaintenanceOp(const std::string& name,
                     IOUsage io_usage,
-                    int32_t priority = 0)
+                    int32_t priority = 0,
+                    CountDownLatch* start_stats_latch = nullptr,
+                    CountDownLatch* continue_stats_latch = nullptr)
     : MaintenanceOp(name, io_usage),
+      start_stats_latch_(start_stats_latch),
+      continue_stats_latch_(continue_stats_latch),
       ram_anchored_(500),
       logs_retained_bytes_(0),
       perf_improvement_(0),
@@ -156,6 +161,10 @@ class TestMaintenanceOp : public MaintenanceOp {
   }
 
   void UpdateStats(MaintenanceOpStats* stats) override {
+    if (start_stats_latch_) {
+      start_stats_latch_->CountDown();
+      DCHECK_NOTNULL(continue_stats_latch_)->Wait();
+    }
     std::lock_guard<Mutex> guard(lock_);
     stats->set_runnable(remaining_runs_ > 0);
     stats->set_ram_anchored(ram_anchored_);
@@ -214,6 +223,14 @@ class TestMaintenanceOp : public MaintenanceOp {
  private:
   mutable Mutex lock_;
 
+  // Latch used to help other threads wait for us to begin updating stats for
+  // this op. Another thread may wait for this latch, and once the countdown is
+  // complete, the maintenance manager lock will be locked while computing
+  // stats, at which point the scheduler thread will wait for
+  // 'continue_stats_latch_' to be counted down.
+  CountDownLatch* start_stats_latch_;
+  CountDownLatch* continue_stats_latch_;
+
   uint64_t ram_anchored_;
   uint64_t logs_retained_bytes_;
   uint64_t perf_improvement_;
@@ -255,6 +272,39 @@ TEST_F(MaintenanceManagerTest, TestRegisterUnregister) {
   manager_->UnregisterOp(&op1);
 }
 
+TEST_F(MaintenanceManagerTest, TestRegisterUnregisterWithContention) {
+  CountDownLatch start_latch(1);
+  CountDownLatch continue_latch(1);
+  TestMaintenanceOp op1("1", MaintenanceOp::HIGH_IO_USAGE, 0, &start_latch, 
&continue_latch);
+  manager_->RegisterOp(&op1);
+  // Wait for the maintenance manager to start updating stats for this op.
+  // This will effectively block the maintenance manager lock until
+  // 'continue_latch' counts down.
+  start_latch.Wait();
+
+  // Register another op while the maintenance manager lock is held.
+  TestMaintenanceOp op2("2", MaintenanceOp::HIGH_IO_USAGE);
+  manager_->RegisterOp(&op2);
+  TestMaintenanceOp op3("3", MaintenanceOp::HIGH_IO_USAGE);
+  manager_->RegisterOp(&op3);
+
+  // Allow UpdateStats() to complete and release the maintenance manager lock.
+  continue_latch.CountDown();
+
+  // We should be able to unregister an op even though, because of the forced
+  // lock contention, it may have been added to the list of ops pending
+  // registration instead of "fully registered" ops.
+  manager_->UnregisterOp(&op3);
+
+  // Even though we blocked registration, when we dump, we'll take the
+  // maintenance manager and merge ops that were pending registration.
+  MaintenanceManagerStatusPB status_pb;
+  manager_->GetMaintenanceManagerStatusDump(&status_pb);
+  ASSERT_EQ(2, status_pb.registered_operations_size());
+  manager_->UnregisterOp(&op1);
+  manager_->UnregisterOp(&op2);
+}
+
 // Regression test for KUDU-1495: when an operation is being unregistered,
 // new instances of that operation should not be scheduled.
 TEST_F(MaintenanceManagerTest, TestNewOpsDontGetScheduledDuringUnregister) {
diff --git a/src/kudu/util/maintenance_manager.cc 
b/src/kudu/util/maintenance_manager.cc
index b79d49e..536fedb 100644
--- a/src/kudu/util/maintenance_manager.cc
+++ b/src/kudu/util/maintenance_manager.cc
@@ -228,18 +228,36 @@ void MaintenanceManager::Shutdown() {
   }
 }
 
+void MaintenanceManager::MergePendingOpRegistrationsUnlocked() {
+  lock_.AssertAcquired();
+  OpMapType ops_to_register;
+  {
+    std::lock_guard<simple_spinlock> l(registration_lock_);
+    ops_to_register = std::move(ops_pending_registration_);
+    ops_pending_registration_.clear();
+  }
+  for (auto& op_and_stats : ops_to_register) {
+    auto* op = op_and_stats.first;
+    op->cond_.reset(new ConditionVariable(&lock_));
+    VLOG_AND_TRACE_WITH_PREFIX("maintenance", 1) << "Registered " << 
op->name();
+  }
+  ops_.insert(ops_to_register.begin(), ops_to_register.end());
+}
+
 void MaintenanceManager::RegisterOp(MaintenanceOp* op) {
   CHECK(op);
-  std::lock_guard<Mutex> guard(lock_);
-  CHECK(!op->manager_) << "Tried to register " << op->name()
-                       << ", but it is already registered.";
-  pair<OpMapTy::iterator, bool> val
-    (ops_.insert(OpMapTy::value_type(op, MaintenanceOpStats())));
-  CHECK(val.second) << "Tried to register " << op->name()
-                    << ", but it already exists in ops_.";
-  op->manager_ = shared_from_this();
-  op->cond_.reset(new ConditionVariable(&lock_));
-  VLOG_AND_TRACE_WITH_PREFIX("maintenance", 1) << "Registered " << op->name();
+  {
+    std::lock_guard<simple_spinlock> l(registration_lock_);
+    CHECK(!op->manager_) << "Tried to register " << op->name()
+                        << ", but it is already registered.";
+    EmplaceOrDie(&ops_pending_registration_, op, MaintenanceOpStats());
+    op->manager_ = shared_from_this();
+  }
+  // If we can take 'lock_', add to 'ops_' immediately.
+  if (lock_.try_lock()) {
+    MergePendingOpRegistrationsUnlocked();
+    lock_.unlock();
+  }
 }
 
 void MaintenanceManager::UnregisterOp(MaintenanceOp* op) {
@@ -247,23 +265,23 @@ void MaintenanceManager::UnregisterOp(MaintenanceOp* op) {
     std::lock_guard<Mutex> guard(lock_);
     CHECK(op->manager_.get() == this) << "Tried to unregister " << op->name()
         << ", but it is not currently registered with this maintenance 
manager.";
-    auto iter = ops_.find(op);
-    CHECK(iter != ops_.end()) << "Tried to unregister " << op->name()
-                              << ", but it was never registered";
+
     // While the op is running, wait for it to be finished.
-    if (iter->first->running_ > 0) {
-      VLOG_AND_TRACE_WITH_PREFIX("maintenance", 1) << "Waiting for op " << 
op->name()
-                                                   << " to finish so we can 
unregister it.";
+    if (op->running_ > 0) {
+      VLOG_AND_TRACE_WITH_PREFIX("maintenance", 1)
+          << Substitute("Waiting for op $0 to finish so we can unregister it", 
op->name());
     }
     op->CancelAndDisable();
-    while (iter->first->running_ > 0) {
+    while (op->running_ > 0) {
       op->cond_->Wait();
-      iter = ops_.find(op);
-      CHECK(iter != ops_.end()) << "Tried to unregister " << op->name()
-          << ", but another thread unregistered it while we were "
-          << "waiting for it to complete";
     }
-    ops_.erase(iter);
+    // Remove the op from 'ops_', and if it wasn't there, erase it from
+    // 'ops_pending_registration_'.
+    if (ops_.erase(op) == 0) {
+      std::lock_guard<simple_spinlock> l(registration_lock_);
+      const auto num_erased_ops = ops_pending_registration_.erase(op);
+      CHECK_GT(num_erased_ops, 0);
+    }
   }
   VLOG_WITH_PREFIX(1) << "Unregistered op " << op->name();
   op->cond_.reset();
@@ -289,6 +307,11 @@ void MaintenanceManager::RunSchedulerThread() {
   bool prev_iter_found_no_work = false;
 
   while (true) {
+    // Upon each iteration, we should have dropped and reacquired 'lock_'.
+    // Register any ops that may have been buffered for registration while the
+    // lock was last held.
+    MergePendingOpRegistrationsUnlocked();
+
     // We'll keep sleeping if:
     //    1) there are no free threads available to perform a maintenance op.
     // or 2) we just tried to schedule an op but found nothing to run.
@@ -554,6 +577,7 @@ void MaintenanceManager::LaunchOp(MaintenanceOp* op) {
 void 
MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatusPB* 
out_pb) {
   DCHECK(out_pb != nullptr);
   std::lock_guard<Mutex> guard(lock_);
+  MergePendingOpRegistrationsUnlocked();
   for (const auto& val : ops_) {
     MaintenanceManagerStatusPB_MaintenanceOpPB* op_pb = 
out_pb->add_registered_operations();
     MaintenanceOp* op(val.first);
diff --git a/src/kudu/util/maintenance_manager.h 
b/src/kudu/util/maintenance_manager.h
index 9bccc9e..faf5382 100644
--- a/src/kudu/util/maintenance_manager.h
+++ b/src/kudu/util/maintenance_manager.h
@@ -34,20 +34,21 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/util/atomic.h"
 #include "kudu/util/condition_variable.h"
+#include "kudu/util/locks.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/mutex.h"
 #include "kudu/util/random.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
-template<class T>
-class AtomicGauge;
 class Histogram;
 class MaintenanceManager;
 class MaintenanceManagerStatusPB;
 class MaintenanceManagerStatusPB_OpInstancePB;
 class Thread;
 class ThreadPool;
+template<class T>
+class AtomicGauge;
 
 class MaintenanceOpStats {
  public:
@@ -306,14 +307,19 @@ class MaintenanceManager : public 
std::enable_shared_from_this<MaintenanceManage
   Status Start();
   void Shutdown();
 
-  // Register an op with the manager.
+  // Register an op with the manager. If the op cannot be put into 'ops_'
+  // immediately because 'lock_' is currently held, buffers the op into
+  // 'ops_pending_registration_', and MergePendingOpRegistrationsUnlocked() can
+  // be used to coalesce them the next time 'lock_' is taken.
   void RegisterOp(MaintenanceOp* op);
 
   // Unregister an op with the manager.
-  // If the Op is currently running, it will not be interrupted.  However, this
+  // If the Op is currently running, it will not be interrupted. However, this
   // function will block until the Op is finished.
   void UnregisterOp(MaintenanceOp* op);
 
+  // Fill out 'out_pb' based on the registered ops, merging in any ops pending
+  // registration in case any exist.
   void GetMaintenanceManagerStatusDump(MaintenanceManagerStatusPB* out_pb);
 
   void set_memory_pressure_func_for_tests(std::function<bool(double*)> f) {
@@ -329,7 +335,7 @@ class MaintenanceManager : public 
std::enable_shared_from_this<MaintenanceManage
   FRIEND_TEST(MaintenanceManagerTest, TestOpFactors);
 
   typedef std::map<MaintenanceOp*, MaintenanceOpStats,
-          MaintenanceOpComparator> OpMapTy;
+          MaintenanceOpComparator> OpMapType;
 
   // Return true if tests have currently disabled the maintenance
   // manager by way of changing the gflags at runtime.
@@ -360,9 +366,22 @@ class MaintenanceManager : public 
std::enable_shared_from_this<MaintenanceManage
   void IncreaseOpCount(MaintenanceOp *op);
   void DecreaseOpCount(MaintenanceOp *op);
 
+  // Adds ops in 'ops_pending_registration_' to 'ops_'. Must be called while
+  // 'lock_' is held.
+  void MergePendingOpRegistrationsUnlocked();
+
   const std::string server_uuid_;
   const int32_t num_threads_;
-  OpMapTy ops_; // Registered operations.
+
+  // Ops for which RegisterOp() has been called, but that have not yet been
+  // added to 'ops_'. Since adding to 'ops_' requires taking 'lock_', rather
+  // than blocking registration (and bootstrapping), this serves as a buffer to
+  // be added to 'ops_' once the lock is available. If this lock is taken
+  // concurrently with 'lock_', 'lock_' must be taken first.
+  simple_spinlock registration_lock_;
+  OpMapType ops_pending_registration_;
+
+  OpMapType ops_; // Registered operations.
   Mutex lock_;
   scoped_refptr<kudu::Thread> monitor_thread_;
   std::unique_ptr<ThreadPool> thread_pool_;

Reply via email to