This is an automated email from the ASF dual-hosted git repository. todd pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 713879ab3dfd055fbaede1741e05e2696081c497 Author: Todd Lipcon <[email protected]> AuthorDate: Thu Jul 9 17:51:43 2020 -0700 tablet: acquire/release locks in batches Prior to this patch, each row operation required a separate call into the LockManager to acquire or release its corresponding row lock. This required atomic operations on the LockManager, each of which would bounce the cacheline containing the spinlock(s) between the prepare thread (acquiring locks) and the apply threads (releasing locks). Atomic operations on cachelines in remote cores are quite expensive, so this caused quite a lot of CPU usage (locking related methods took more CPU than the actual work on prepare/apply threads) This patch changes the LockManager API somewhat to allow acquiring/releasing the locks in bulk. WriteOp now contains a single ScopedRowLock object to hold all of the required locks, instead of a separate ScopedRowLock per row. The acquire/release paths are also optimized now to get better instruction level parallelism in the hashtable lookups -- prefetches are used to bring the appropriate cache lines into CPU cache prior to reading them. This patch also simplifies the locking to use a single lockmanager-wide lock instead of per-bucket locks. Per-bucket locks would still be possible but don't seem to be beneficial, and would complicate the codepath quite a bit. Removing them also saves some significant memory from the LockManager hash tables. Benchmarked by running: $ perf stat ./build/release/bin/kudu tserver run -fs-wal-dir /tmp/ts \ -enable_maintenance_manager=0 -unlock-unsafe-flags and $ kudu perf loadgen localhost -num_rows_per_thread=10000000 -num_threads=8 This workload ends up somewhat client-bound on my machine, so wallclock time isn't improved much, but the tserver CPU consumption is reduced by about 23%: Before: Performance counter stats for './build/release/bin/kudu tserver run -fs-wal-dir /tmp/ts -enable_maintenance_manager=0 -unlock-unsafe-flags': 177786.37 msec task-clock # 7.648 CPUs utilized 215350 context-switches # 0.001 M/sec 55653 cpu-migrations # 0.313 K/sec 3219417 page-faults # 0.018 M/sec 724369004287 cycles # 4.074 GHz (83.32%) 142799406201 stalled-cycles-frontend # 19.71% frontend cycles idle (83.36%) 109903159567 stalled-cycles-backend # 15.17% backend cycles idle (83.40%) 719706215568 instructions # 0.99 insn per cycle # 0.20 stalled cycles per insn (83.30%) 131445739053 branches # 739.347 M/sec (83.31%) 479779584 branch-misses # 0.37% of all branches (83.32%) 165.187526000 seconds user 12.866637000 seconds sys After: Performance counter stats for './build/release/bin/kudu tserver run -fs-wal-dir /tmp/ts -enable_maintenance_manager=0 -unlock-unsafe-flags': 145986.70 msec task-clock # 6.063 CPUs utilized 202600 context-switches # 0.001 M/sec 51442 cpu-migrations # 0.352 K/sec 3368490 page-faults # 0.023 M/sec 597915142173 cycles # 4.096 GHz (83.51%) 58333772996 stalled-cycles-frontend # 9.76% frontend cycles idle (83.35%) 104785221789 stalled-cycles-backend # 17.53% backend cycles idle (83.18%) 690655964982 instructions # 1.16 insn per cycle # 0.15 stalled cycles per insn (83.38%) 126988529873 branches # 869.864 M/sec (83.40%) 469031328 branch-misses # 0.37% of all branches (83.17%) 134.072172000 seconds user 12.192747000 seconds sys Change-Id: I3cb724e953ecdf188a35181c2f91b721b3416524 Reviewed-on: http://gerrit.cloudera.org:8080/16169 Tested-by: Todd Lipcon <[email protected]> Reviewed-by: Andrew Wong <[email protected]> --- src/kudu/tablet/lock_manager-test.cc | 109 +++++++-------- src/kudu/tablet/lock_manager.cc | 249 ++++++++++++++++++++--------------- src/kudu/tablet/lock_manager.h | 55 ++++---- src/kudu/tablet/ops/write_op.cc | 17 ++- src/kudu/tablet/ops/write_op.h | 7 + src/kudu/tablet/row_op.h | 9 -- src/kudu/tablet/tablet.cc | 30 ++--- src/kudu/tablet/tablet.h | 23 +--- 8 files changed, 252 insertions(+), 247 deletions(-) diff --git a/src/kudu/tablet/lock_manager-test.cc b/src/kudu/tablet/lock_manager-test.cc index 426523d..d2d9005 100644 --- a/src/kudu/tablet/lock_manager-test.cc +++ b/src/kudu/tablet/lock_manager-test.cc @@ -32,6 +32,7 @@ #include "kudu/gutil/macros.h" #include "kudu/gutil/stringprintf.h" +#include "kudu/util/array_view.h" #include "kudu/util/env.h" #include "kudu/util/slice.h" #include "kudu/util/stopwatch.h" @@ -58,56 +59,66 @@ class LockManagerTest : public KuduTest { public: void VerifyAlreadyLocked(const Slice& key) { LockEntry *entry; - ASSERT_EQ(LockManager::LOCK_BUSY, - lock_manager_.TryLock(key, kFakeTransaction, LockManager::LOCK_EXCLUSIVE, &entry)); + ASSERT_FALSE(lock_manager_.TryLock(key, kFakeTransaction, &entry)); } LockManager lock_manager_; }; TEST_F(LockManagerTest, TestLockUnlockSingleRow) { - Slice key_a("a"); - ScopedRowLock(&lock_manager_, kFakeTransaction, key_a, LockManager::LOCK_EXCLUSIVE); - ScopedRowLock(&lock_manager_, kFakeTransaction, key_a, LockManager::LOCK_EXCLUSIVE); - ScopedRowLock(&lock_manager_, kFakeTransaction, key_a, LockManager::LOCK_EXCLUSIVE); + Slice key_a[] = {"a"}; + for (int i = 0; i < 3; i++) { + ScopedRowLock l(&lock_manager_, kFakeTransaction, key_a, LockManager::LOCK_EXCLUSIVE); + } } // Test if the same transaction locks the same row multiple times. TEST_F(LockManagerTest, TestMultipleLockSameRow) { - Slice key_a("a"); + Slice key_a[] = {"a"}; ScopedRowLock first_lock(&lock_manager_, kFakeTransaction, key_a, LockManager::LOCK_EXCLUSIVE); - ASSERT_EQ(LockManager::LOCK_ACQUIRED, first_lock.GetLockStatusForTests()); - VerifyAlreadyLocked(key_a); + ASSERT_TRUE(first_lock.acquired()); + VerifyAlreadyLocked(key_a[0]); { ScopedRowLock second_lock(&lock_manager_, kFakeTransaction, key_a, LockManager::LOCK_EXCLUSIVE); - ASSERT_EQ(LockManager::LOCK_ACQUIRED, second_lock.GetLockStatusForTests()); - VerifyAlreadyLocked(key_a); + ASSERT_TRUE(second_lock.acquired()); + VerifyAlreadyLocked(key_a[0]); } - ASSERT_EQ(LockManager::LOCK_ACQUIRED, first_lock.GetLockStatusForTests()); - VerifyAlreadyLocked(key_a); + ASSERT_TRUE(first_lock.acquired()); + VerifyAlreadyLocked(key_a[0]); } TEST_F(LockManagerTest, TestLockUnlockMultipleRows) { - Slice key_a("a"), key_b("b"); + Slice key_a[] = {"a"}; + Slice key_b[] = {"b"}; for (int i = 0; i < 3; ++i) { ScopedRowLock l1(&lock_manager_, kFakeTransaction, key_a, LockManager::LOCK_EXCLUSIVE); ScopedRowLock l2(&lock_manager_, kFakeTransaction, key_b, LockManager::LOCK_EXCLUSIVE); - VerifyAlreadyLocked(key_a); - VerifyAlreadyLocked(key_b); + VerifyAlreadyLocked(key_a[0]); + VerifyAlreadyLocked(key_b[0]); + } +} + +TEST_F(LockManagerTest, TestLockBatch) { + vector<Slice> keys = {"a", "b", "c"}; + { + ScopedRowLock l1(&lock_manager_, kFakeTransaction, keys, LockManager::LOCK_EXCLUSIVE); + for (const auto& k : keys) { + VerifyAlreadyLocked(k); + } } } TEST_F(LockManagerTest, TestRelockSameRow) { - Slice key_a("a"); + Slice key_a[] = {"a"}; ScopedRowLock row_lock(&lock_manager_, kFakeTransaction, key_a, LockManager::LOCK_EXCLUSIVE); - VerifyAlreadyLocked(key_a); + VerifyAlreadyLocked(key_a[0]); } TEST_F(LockManagerTest, TestMoveLock) { // Acquire a lock. - Slice key_a("a"); + Slice key_a[] = {"a"}; ScopedRowLock row_lock(&lock_manager_, kFakeTransaction, key_a, LockManager::LOCK_EXCLUSIVE); ASSERT_TRUE(row_lock.acquired()); @@ -119,15 +130,9 @@ TEST_F(LockManagerTest, TestMoveLock) { class LmTestResource { public: - explicit LmTestResource(const Slice* id) - : id_(id), - owner_(0), - is_owned_(false) { - } + explicit LmTestResource(Slice id) : id_(id), owner_(0), is_owned_(false) {} - const Slice* id() const { - return id_; - } + Slice id() const { return id_; } void acquire(uint64_t tid) { std::unique_lock<std::mutex> lock(lock_); @@ -148,7 +153,7 @@ class LmTestResource { private: DISALLOW_COPY_AND_ASSIGN(LmTestResource); - const Slice* id_; + const Slice id_; std::mutex lock_; uint64_t owner_; bool is_owned_; @@ -156,9 +161,8 @@ class LmTestResource { class LmTestThread { public: - LmTestThread(LockManager* manager, vector<const Slice*> keys, - const vector<LmTestResource*> resources) - : manager_(manager), keys_(std::move(keys)), resources_(resources) {} + LmTestThread(LockManager* manager, vector<Slice> keys, vector<LmTestResource*> resources) + : manager_(manager), keys_(std::move(keys)), resources_(std::move(resources)) {} void Start() { thread_ = thread([this]() { this->Run(); }); @@ -168,15 +172,9 @@ class LmTestThread { tid_ = Env::Default()->gettid(); const OpState* my_txn = reinterpret_cast<OpState*>(tid_); - std::sort(keys_.begin(), keys_.end()); + std::sort(keys_.begin(), keys_.end(), Slice::Comparator()); for (int i = 0; i < FLAGS_num_iterations; i++) { - std::vector<shared_ptr<ScopedRowLock> > locks; - // TODO: We don't have an API for multi-row - for (const Slice* key : keys_) { - locks.push_back(std::make_shared<ScopedRowLock>( - manager_, my_txn, *key, LockManager::LOCK_EXCLUSIVE)); - } - + ScopedRowLock l(manager_, my_txn, keys_, LockManager::LOCK_EXCLUSIVE); for (LmTestResource* r : resources_) { r->acquire(tid_); } @@ -193,14 +191,14 @@ class LmTestThread { private: DISALLOW_COPY_AND_ASSIGN(LmTestThread); LockManager* manager_; - vector<const Slice*> keys_; + vector<Slice> keys_; const vector<LmTestResource*> resources_; uint64_t tid_; thread thread_; }; -static void runPerformanceTest(const char *test_type, - vector<shared_ptr<LmTestThread> > *threads) { +static void RunPerformanceTest(const char* test_type, + vector<shared_ptr<LmTestThread> >* threads) { Stopwatch sw(Stopwatch::ALL_THREADS); sw.start(); for (const shared_ptr<LmTestThread>& t : *threads) { @@ -233,12 +231,9 @@ static void runPerformanceTest(const char *test_type, // Test running a bunch of threads at once that want an overlapping set of // resources. TEST_F(LockManagerTest, TestContention) { - Slice slice_a("a"); - LmTestResource resource_a(&slice_a); - Slice slice_b("b"); - LmTestResource resource_b(&slice_b); - Slice slice_c("c"); - LmTestResource resource_c(&slice_c); + LmTestResource resource_a("a"); + LmTestResource resource_b("b"); + LmTestResource resource_c("c"); vector<shared_ptr<LmTestThread> > threads; for (int i = 0; i < FLAGS_num_test_threads; ++i) { vector<LmTestResource*> resources; @@ -252,15 +247,14 @@ TEST_F(LockManagerTest, TestContention) { resources.push_back(&resource_c); resources.push_back(&resource_a); } - vector<const Slice*> keys; - for (vector<LmTestResource*>::const_iterator r = resources.begin(); - r != resources.end(); ++r) { - keys.push_back((*r)->id()); + vector<Slice> keys; + for (LmTestResource* r : resources) { + keys.push_back(r->id()); } threads.push_back(std::make_shared<LmTestThread>( &lock_manager_, keys, resources)); } - runPerformanceTest("Contended", &threads); + RunPerformanceTest("Contended", &threads); } // Test running a bunch of threads at once that want different @@ -276,19 +270,18 @@ TEST_F(LockManagerTest, TestUncontended) { } vector<shared_ptr<LmTestResource> > resources; for (int i = 0; i < FLAGS_num_test_threads; i++) { - resources.push_back( - std::make_shared<LmTestResource>(&slices[i])); + resources.push_back(std::make_shared<LmTestResource>(slices[i])); } vector<shared_ptr<LmTestThread> > threads; for (int i = 0; i < FLAGS_num_test_threads; ++i) { - vector<const Slice*> k; - k.push_back(&slices[i]); + vector<Slice> k; + k.push_back(slices[i]); vector<LmTestResource*> r; r.push_back(resources[i].get()); threads.push_back(std::make_shared<LmTestThread>( &lock_manager_, k, r)); } - runPerformanceTest("Uncontended", &threads); + RunPerformanceTest("Uncontended", &threads); } } // namespace tablet diff --git a/src/kudu/tablet/lock_manager.cc b/src/kudu/tablet/lock_manager.cc index aa29657..01667fe 100644 --- a/src/kudu/tablet/lock_manager.cc +++ b/src/kudu/tablet/lock_manager.cc @@ -17,19 +17,22 @@ #include "kudu/tablet/lock_manager.h" +#include <cstddef> #include <cstdint> #include <memory> #include <mutex> #include <ostream> #include <string> +#include <utility> +#include <vector> #include <glog/logging.h> -#include "kudu/gutil/atomicops.h" #include "kudu/gutil/dynamic_annotations.h" #include "kudu/gutil/hash/city.h" #include "kudu/gutil/port.h" #include "kudu/gutil/walltime.h" +#include "kudu/util/array_view.h" #include "kudu/util/faststring.h" #include "kudu/util/locks.h" #include "kudu/util/logging.h" @@ -37,9 +40,10 @@ #include "kudu/util/semaphore.h" #include "kudu/util/trace.h" -using base::subtle::NoBarrier_Load; using std::string; +using std::unique_lock; using std::unique_ptr; +using std::vector; namespace kudu { namespace tablet { @@ -70,6 +74,8 @@ class LockEntry { return KUDU_REDACT(key_.ToDebugString()); } + void Unlock(); + // Mutex used by the LockManager Semaphore sem; int recursion_; @@ -105,7 +111,6 @@ class LockEntry { class LockTable { private: struct Bucket { - simple_spinlock lock; // First entry chained from this bucket, or NULL if the bucket is empty. LockEntry *chain_head; Bucket() : chain_head(nullptr) {} @@ -118,7 +123,7 @@ class LockTable { ~LockTable() { // Sanity checks: The table shouldn't be destructed when there are any entries in it. - DCHECK_EQ(0, NoBarrier_Load(&(item_count_))) << "There are some unreleased locks"; + DCHECK_EQ(0, item_count_) << "There are some unreleased locks"; for (size_t i = 0; i < size_; ++i) { for (LockEntry *p = buckets_[i].chain_head; p != nullptr; p = p->ht_next_) { DCHECK(p == nullptr) << "The entry " << p->ToString() << " was not released"; @@ -126,8 +131,10 @@ class LockTable { } } - LockEntry *GetLockEntry(const Slice &key); - void ReleaseLockEntry(LockEntry *entry); + vector<LockEntry*> GetLockEntries(ArrayView<Slice> keys); + LockEntry* GetLockEntry(Slice key); + + void ReleaseLockEntries(ArrayView<LockEntry*> entries); private: Bucket *FindBucket(uint64_t hash) const { @@ -160,85 +167,123 @@ class LockTable { void Resize(); private: - // table rwlock used as write on resize - percpu_rwlock lock_; + simple_spinlock lock_; // size - 1 used to lookup the bucket (hash & mask_) uint64_t mask_; // number of buckets in the table uint64_t size_; + // number of items in the table + int64_t item_count_; // table buckets unique_ptr<Bucket[]> buckets_; - // number of items in the table - base::subtle::Atomic64 item_count_; }; -LockEntry *LockTable::GetLockEntry(const Slice& key) { - auto new_entry = new LockEntry(key); - LockEntry *old_entry; +vector<LockEntry*> LockTable::GetLockEntries(ArrayView<Slice> keys) { + vector<LockEntry*> entries; + entries.resize(keys.size()); + for (int i = 0; i < keys.size(); i++) { + entries[i] = new LockEntry(keys[i]); + } + + vector<LockEntry*> to_delete; + // TODO(todd) prefetch the hash buckets { - shared_lock<rw_spinlock> l(lock_.get_lock()); - Bucket *bucket = FindBucket(new_entry->key_hash_); - { - std::lock_guard<simple_spinlock> bucket_lock(bucket->lock); + unique_lock<simple_spinlock> l(lock_); + for (int i = 0; i < entries.size(); i++) { + LockEntry* new_entry = entries[i]; + Bucket* bucket = FindBucket(new_entry->key_hash_); LockEntry **node = FindSlot(bucket, new_entry->key_, new_entry->key_hash_); - old_entry = *node; - if (old_entry != nullptr) { + LockEntry* old_entry = *node; + if (PREDICT_FALSE(old_entry != nullptr)) { old_entry->refs_++; + to_delete.push_back(entries[i]); + entries[i] = old_entry; } else { new_entry->ht_next_ = nullptr; new_entry->CopyKey(); *node = new_entry; + ++item_count_; + + if (PREDICT_FALSE(item_count_ > size_)) { + Resize(); + } } } } - if (old_entry != nullptr) { - delete new_entry; - return old_entry; - } + for (auto* e : to_delete) delete e; - if (base::subtle::NoBarrier_AtomicIncrement(&item_count_, 1) > size_) { - std::unique_lock<percpu_rwlock> table_wrlock(lock_, std::try_to_lock); - // if we can't take the lock, means that someone else is resizing. - // (The percpu_rwlock try_lock waits for readers to complete) - if (table_wrlock.owns_lock()) { - Resize(); - } - } + return entries; +} - return new_entry; +LockEntry* LockTable::GetLockEntry(Slice key) { + vector<LockEntry*> entries = GetLockEntries({&key, 1}); + return entries[0]; } -void LockTable::ReleaseLockEntry(LockEntry *entry) { - bool removed = false; +void LockTable::ReleaseLockEntries(ArrayView<LockEntry*> entries) { + // Construct a linked list co-opting the ht_next pointers of the entries + // to keep track of which objects need to be deleted. + LockEntry* removed_head = nullptr; + + const auto& RemoveEntryFromBucket = [&](Bucket* bucket, LockEntry* entry) { + LockEntry** node = FindEntry(bucket, entry); + if (PREDICT_TRUE(node != nullptr)) { + if (--entry->refs_ > 0) return; + + *node = entry->ht_next_; + entry->ht_next_ = removed_head; + removed_head = entry; + item_count_--; + } else { + LOG(DFATAL) << "Unable to find LockEntry on release"; + } + }; + { - std::lock_guard<rw_spinlock> table_rdlock(lock_.get_lock()); - Bucket *bucket = FindBucket(entry->key_hash_); - { - std::lock_guard<simple_spinlock> bucket_lock(bucket->lock); - LockEntry **node = FindEntry(bucket, entry); - if (node != nullptr) { - // ASSUMPTION: There are few updates, so locking the same row at the same time is rare - // TODO: Move out this if we're going with the TryLock - if (--entry->refs_ > 0) - return; - - *node = entry->ht_next_; - removed = true; + unique_lock<simple_spinlock> l(lock_); + + auto it = entries.begin(); + int rem = entries.size(); + + // Manually block the loop into a series of constant-sized batches + // followed by one last variable-sized batch for the remainder. + // + // The batch size was experimentally determined. + static constexpr int kBatchSize = 16; + LockEntry* batch[kBatchSize]; + Bucket* buckets[kBatchSize]; + const auto& ProcessBatch = [&](int n) { + for (int i = 0; i < n; i++) { + batch[i] = *it++; + buckets[i] = FindBucket(batch[i]->key_hash_); + prefetch(reinterpret_cast<const char*>(buckets[i]), PREFETCH_HINT_T0); + } + for (int i = 0; i < n; i++) { + RemoveEntryFromBucket(buckets[i], batch[i]); } + }; + + while (rem >= kBatchSize) { + ProcessBatch(kBatchSize); + rem -= kBatchSize; } + ProcessBatch(rem); } - DCHECK(removed) << "Unable to find LockEntry on release"; - base::subtle::NoBarrier_AtomicIncrement(&item_count_, -1); - delete entry; + // Actually free the memory outside the lock. + while (removed_head) { + auto* tmp = removed_head; + removed_head = removed_head->ht_next_; + delete tmp; + } } void LockTable::Resize() { // Calculate a new table size size_t new_size = 16; - while (new_size < base::subtle::NoBarrier_Load(&item_count_)) { + while (new_size < item_count_) { new_size <<= 1; } @@ -274,21 +319,13 @@ void LockTable::Resize() { // ScopedRowLock // ============================================================================ -ScopedRowLock::ScopedRowLock(LockManager *manager, +ScopedRowLock::ScopedRowLock(LockManager* manager, const OpState* op, - const Slice &key, + ArrayView<Slice> keys, LockManager::LockMode mode) - : manager_(DCHECK_NOTNULL(manager)), - acquired_(false) { - ls_ = manager_->Lock(key, op, mode, &entry_); - - if (ls_ == LockManager::LOCK_ACQUIRED) { - acquired_ = true; - } else { - // the lock might already have been acquired by this op so - // simply check that we didn't get a LOCK_BUSY status (we should have waited) - CHECK_NE(ls_, LockManager::LOCK_BUSY); - } + : manager_(DCHECK_NOTNULL(manager)) { + DCHECK_EQ(LockManager::LOCK_EXCLUSIVE, mode); + entries_ = manager_->LockBatch(keys, op); } ScopedRowLock::ScopedRowLock(ScopedRowLock&& other) noexcept { @@ -302,12 +339,7 @@ ScopedRowLock& ScopedRowLock::operator=(ScopedRowLock&& other) noexcept { void ScopedRowLock::TakeState(ScopedRowLock* other) { manager_ = other->manager_; - acquired_ = other->acquired_; - entry_ = other->entry_; - ls_ = other->ls_; - - other->acquired_ = false; - other->entry_ = nullptr; + entries_ = std::move(other->entries_); } ScopedRowLock::~ScopedRowLock() { @@ -315,11 +347,12 @@ ScopedRowLock::~ScopedRowLock() { } void ScopedRowLock::Release() { - if (entry_) { - manager_->Release(entry_, ls_); - acquired_ = false; - entry_ = nullptr; + if (entries_.empty()) return; // Already released. + for (auto* entry : entries_) { + DCHECK_NOTNULL(entry)->Unlock(); } + manager_->ReleaseBatch(entries_); + entries_.clear(); } // ============================================================================ @@ -334,18 +367,24 @@ LockManager::~LockManager() { delete locks_; } -LockManager::LockStatus LockManager::Lock(const Slice& key, - const OpState* op, - LockManager::LockMode mode, - LockEntry** entry) { - *entry = locks_->GetLockEntry(key); +std::vector<LockEntry*> LockManager::LockBatch(ArrayView<Slice> keys, const OpState* op) { + vector<LockEntry*> entries = locks_->GetLockEntries(keys); + for (auto* e : entries) { + AcquireLockOnEntry(e, op); + } + return entries; +} + +void LockManager::ReleaseBatch(ArrayView<LockEntry*> locks) { locks_->ReleaseLockEntries(locks); } + +void LockManager::AcquireLockOnEntry(LockEntry* entry, const OpState* op) { // We expect low contention, so just try to try_lock first. This is faster // than a timed_lock, since we don't have to do a syscall to get the current // time. - if (!(*entry)->sem.TryAcquire()) { - // If the current holder of this lock is the same op just return - // a LOCK_ALREADY_ACQUIRED status without actually acquiring the mutex. + if (!entry->sem.TryAcquire()) { + // If the current holder of this lock is the same op just increment + // the recursion count without acquiring the mutex. // // // NOTE: This is not a problem for the current way locks are managed since @@ -353,9 +392,9 @@ LockManager::LockStatus LockManager::Lock(const Slice& key, // obtained and released at the same time). If at any time in the future // we opt to perform more fine grained locking, possibly letting ops // release a portion of the locks they no longer need, this no longer is OK. - if (ANNOTATE_UNPROTECTED_READ((*entry)->holder_) == op) { - (*entry)->recursion_++; - return LOCK_ACQUIRED; + if (ANNOTATE_UNPROTECTED_READ(entry->holder_) == op) { + entry->recursion_++; + return; } // If we couldn't immediately acquire the lock, do a timed lock so we can @@ -365,10 +404,10 @@ LockManager::LockStatus LockManager::Lock(const Slice& key, TRACE_COUNTER_INCREMENT("row_lock_wait_count", 1); MicrosecondsInt64 start_wait_us = GetMonoTimeMicros(); int waited_seconds = 0; - while (!(*entry)->sem.TimedAcquire(MonoDelta::FromSeconds(1))) { - const OpState* cur_holder = ANNOTATE_UNPROTECTED_READ((*entry)->holder_); + while (!entry->sem.TimedAcquire(MonoDelta::FromSeconds(1))) { + const OpState* cur_holder = ANNOTATE_UNPROTECTED_READ(entry->holder_); LOG(WARNING) << "Waited " << (++waited_seconds) << " seconds to obtain row lock on key " - << KUDU_REDACT(key.ToDebugString()) << " cur holder: " << cur_holder; + << entry->ToString() << " cur holder: " << cur_holder; // TODO(unknown): would be nice to also include some info about the blocking op, // but it's a bit tricky to do in a non-racy fashion (the other op may // complete at any point) @@ -376,39 +415,35 @@ LockManager::LockStatus LockManager::Lock(const Slice& key, MicrosecondsInt64 wait_us = GetMonoTimeMicros() - start_wait_us; TRACE_COUNTER_INCREMENT("row_lock_wait_us", wait_us); if (wait_us > 100 * 1000) { - TRACE("Waited $0us for lock on $1", wait_us, KUDU_REDACT(key.ToDebugString())); + TRACE("Waited $0us for lock on $1", wait_us, KUDU_REDACT(entry->ToString())); } } - (*entry)->holder_ = op; - return LOCK_ACQUIRED; + entry->holder_ = op; } -LockManager::LockStatus LockManager::TryLock(const Slice& key, - const OpState* op, - LockManager::LockMode mode, - LockEntry **entry) { +bool LockManager::TryLock(const Slice& key, const OpState* op, LockEntry** entry) { *entry = locks_->GetLockEntry(key); bool locked = (*entry)->sem.TryAcquire(); if (!locked) { - locks_->ReleaseLockEntry(*entry); - return LOCK_BUSY; + Release(*entry); + return false; } (*entry)->holder_ = op; - return LOCK_ACQUIRED; + return true; } -void LockManager::Release(LockEntry *lock, LockStatus ls) { - DCHECK_NOTNULL(lock)->holder_ = nullptr; - if (ls == LOCK_ACQUIRED) { - if (lock->recursion_ > 0) { - lock->recursion_--; - } else { - lock->sem.Release(); - } +void LockEntry::Unlock() { + DCHECK(holder_); + if (recursion_ > 0) { + recursion_--; + } else { + holder_ = nullptr; + sem.Release(); } - locks_->ReleaseLockEntry(lock); } +void LockManager::Release(LockEntry* lock) { locks_->ReleaseLockEntries({&lock, 1}); } + } // namespace tablet } // namespace kudu diff --git a/src/kudu/tablet/lock_manager.h b/src/kudu/tablet/lock_manager.h index f3170e5..3f761a4 100644 --- a/src/kudu/tablet/lock_manager.h +++ b/src/kudu/tablet/lock_manager.h @@ -17,15 +17,20 @@ #ifndef KUDU_TABLET_LOCK_MANAGER_H #define KUDU_TABLET_LOCK_MANAGER_H -#include <cstddef> +#include <vector> #include "kudu/gutil/macros.h" #include "kudu/util/slice.h" +namespace kudu { +template <typename T> +class ArrayView; +} // namespace kudu + namespace kudu { namespace tablet { -class LockTable; class LockEntry; +class LockTable; class OpState; // Super-simple lock manager implementation. This only supports exclusive @@ -40,11 +45,6 @@ class LockManager { LockManager(); ~LockManager(); - enum LockStatus { - LOCK_ACQUIRED = 0, - LOCK_BUSY = 1, - }; - enum LockMode { LOCK_EXCLUSIVE }; @@ -53,22 +53,23 @@ class LockManager { friend class ScopedRowLock; friend class LockManagerTest; - LockStatus Lock(const Slice& key, const OpState* op, - LockMode mode, LockEntry **entry); - LockStatus TryLock(const Slice& key, const OpState* op, - LockMode mode, LockEntry **entry); - void Release(LockEntry *lock, LockStatus ls); + std::vector<LockEntry*> LockBatch(ArrayView<Slice> keys, const OpState* op); + + bool TryLock(const Slice& key, const OpState* op, LockEntry** entry); + void Release(LockEntry* lock); + void ReleaseBatch(ArrayView<LockEntry*> locks); + + static void AcquireLockOnEntry(LockEntry* e, const OpState* op); LockTable *locks_; DISALLOW_COPY_AND_ASSIGN(LockManager); }; - -// Hold a lock on a given row, for the scope of this object. +// Hold a lock on a set of rows, for the scope of this object. // Usage: // { -// ScopedRowLock(&manager, my_encoded_row_key, LOCK_EXCLUSIVE); +// ScopedRowLock(&manager, op, my_encoded_row_keys, LOCK_EXCLUSIVE); // .. do stuff with the row .. // } // // lock is released when the object exits its scope. @@ -92,16 +93,14 @@ class ScopedRowLock { // l = ScopedRowLock(...); // or // l = std::move(other_row_lock); - ScopedRowLock() - : manager_(NULL), - acquired_(false), - entry_(NULL) { - } + ScopedRowLock() {} - // Lock row in the given LockManager. The 'key' slice must remain + // Lock rows in the given LockManager. The 'key' slices must remain // valid and un-changed for the duration of this object's lifetime. - ScopedRowLock(LockManager *manager, const OpState* ctx, - const Slice &key, LockManager::LockMode mode); + ScopedRowLock(LockManager* manager, + const OpState* op, + ArrayView<Slice> keys, + LockManager::LockMode mode); // Move constructor and assignment. ScopedRowLock(ScopedRowLock&& other) noexcept; @@ -109,20 +108,16 @@ class ScopedRowLock { void Release(); - bool acquired() const { return acquired_; } - - LockManager::LockStatus GetLockStatusForTests() { return ls_; } + bool acquired() const { return !entries_.empty(); } ~ScopedRowLock(); private: void TakeState(ScopedRowLock* other); - LockManager *manager_; + LockManager* manager_ = nullptr; - bool acquired_; - LockEntry *entry_; - LockManager::LockStatus ls_; + std::vector<LockEntry*> entries_; }; } // namespace tablet diff --git a/src/kudu/tablet/ops/write_op.cc b/src/kudu/tablet/ops/write_op.cc index aa27c97..a20cdb5 100644 --- a/src/kudu/tablet/ops/write_op.cc +++ b/src/kudu/tablet/ops/write_op.cc @@ -25,6 +25,7 @@ #include <ostream> #include <vector> +#include <boost/container/small_vector.hpp> #include <boost/optional/optional.hpp> #include <gflags/gflags.h> #include <glog/logging.h> @@ -59,6 +60,7 @@ #include "kudu/util/metrics.h" #include "kudu/util/pb_util.h" #include "kudu/util/rw_semaphore.h" +#include "kudu/util/slice.h" #include "kudu/util/trace.h" DEFINE_int32(tablet_inject_latency_on_apply_write_txn_ms, 0, @@ -446,13 +448,22 @@ void WriteOpState::UpdateMetricsForOp(const RowOp& op) { } } -void WriteOpState::ReleaseRowLocks() { - // free the row locks +void WriteOpState::AcquireRowLocks(LockManager* lock_manager) { + DCHECK(!rows_lock_.acquired()); + + boost::container::small_vector<Slice, 8> keys; + keys.reserve(row_ops_.size()); + for (RowOp* op : row_ops_) { - op->row_lock.Release(); + if (op->has_result()) continue; + keys.push_back(op->key_probe->encoded_key_slice()); } + + rows_lock_ = ScopedRowLock(lock_manager, this, keys, LockManager::LOCK_EXCLUSIVE); } +void WriteOpState::ReleaseRowLocks() { rows_lock_.Release(); } + WriteOpState::~WriteOpState() { Reset(); } diff --git a/src/kudu/tablet/ops/write_op.h b/src/kudu/tablet/ops/write_op.h index e803692..2bb5201 100644 --- a/src/kudu/tablet/ops/write_op.h +++ b/src/kudu/tablet/ops/write_op.h @@ -32,6 +32,7 @@ #include "kudu/gutil/macros.h" #include "kudu/gutil/ref_counted.h" #include "kudu/tablet/ops/op.h" +#include "kudu/tablet/lock_manager.h" #include "kudu/tablet/rowset.h" #include "kudu/tserver/tserver.pb.h" #include "kudu/util/bitset.h" @@ -161,6 +162,9 @@ class WriteOpState : public OpState { // the writes. void AcquireSchemaLock(rw_semaphore* schema_lock); + // Acquire row locks for all of the rows in this Write. + void AcquireRowLocks(LockManager* lock_manager); + // Release the already-acquired schema lock. void ReleaseSchemaLock(); @@ -247,6 +251,9 @@ class WriteOpState : public OpState { // Protected by op_state_lock_. std::vector<RowOp*> row_ops_; + // Holds the LockManager locks acquired for this operation. + ScopedRowLock rows_lock_; + // Array of ProbeStats for each of the operations in 'row_ops_'. // Allocated from this op's arena during SetRowOps(). ProbeStats* stats_array_ = nullptr; diff --git a/src/kudu/tablet/row_op.h b/src/kudu/tablet/row_op.h index abaa319..19a9092 100644 --- a/src/kudu/tablet/row_op.h +++ b/src/kudu/tablet/row_op.h @@ -19,7 +19,6 @@ #include <string> #include "kudu/common/row_operations.h" -#include "kudu/tablet/lock_manager.h" namespace google { namespace protobuf { @@ -67,10 +66,6 @@ struct RowOp { orig_result_from_log = orig_result; } - bool has_row_lock() const { - return row_lock.acquired(); - } - bool has_result() const { return result != nullptr; } @@ -93,10 +88,6 @@ struct RowOp { // Allocated on the op state's Arena. RowSetKeyProbe* key_probe = nullptr; - // The row lock which has been acquired for this row. Set during the "prepare" - // phase. - ScopedRowLock row_lock; - // Flag whether this op has already been validated as valid. bool valid = false; diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc index 78d2c5a..176d8f4 100644 --- a/src/kudu/tablet/tablet.cc +++ b/src/kudu/tablet/tablet.cc @@ -486,10 +486,21 @@ Status Tablet::AcquireRowLocks(WriteOpState* op_state) { TRACE_EVENT1("tablet", "Tablet::AcquireRowLocks", "num_locks", op_state->row_ops().size()); TRACE("Acquiring locks for $0 operations", op_state->row_ops().size()); + for (RowOp* op : op_state->row_ops()) { if (op->has_result()) continue; - RETURN_NOT_OK(AcquireLockForOp(op_state, op)); + + ConstContiguousRow row_key(&key_schema_, op->decoded_op.row_data); + Arena* arena = op_state->arena(); + op->key_probe = arena->NewObject<RowSetKeyProbe>(row_key, arena); + if (PREDICT_FALSE(!ValidateOpOrMarkFailed(op))) { + continue; + } + RETURN_NOT_OK(CheckRowInTablet(row_key)); } + + op_state->AcquireRowLocks(&lock_manager_); + TRACE("Locks acquired"); return Status::OK(); } @@ -510,22 +521,6 @@ Status Tablet::CheckRowInTablet(const ConstContiguousRow& row) const { return Status::OK(); } -Status Tablet::AcquireLockForOp(WriteOpState* op_state, RowOp* op) { - ConstContiguousRow row_key(&key_schema_, op->decoded_op.row_data); - Arena* arena = op_state->arena(); - op->key_probe = arena->NewObject<RowSetKeyProbe>(row_key, arena); - if (PREDICT_FALSE(!ValidateOpOrMarkFailed(op))) { - return Status::OK(); - } - RETURN_NOT_OK(CheckRowInTablet(row_key)); - - op->row_lock = ScopedRowLock(&lock_manager_, - op_state, - op->key_probe->encoded_key_slice(), - LockManager::LOCK_EXCLUSIVE); - return Status::OK(); -} - void Tablet::AssignTimestampAndStartOpForTests(WriteOpState* op_state) { CHECK(!op_state->has_timestamp()); // Don't support COMMIT_WAIT for tests that don't boot a tablet server. @@ -998,7 +993,6 @@ Status Tablet::ApplyRowOperation(const IOContext* io_context, Substitute("Apply of $0 exited early", op_state->ToString())); CHECK(s == kOpen || s == kBootstrapping); } - DCHECK(row_op->has_row_lock()) << "RowOp must hold the row lock."; DCHECK(op_state != nullptr) << "must have a WriteOpState"; DCHECK(op_state->op_id().IsInitialized()) << "OpState OpId needed for anchoring"; DCHECK_EQ(op_state->schema_at_decode_time(), schema()); diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h index d9cbf85..d0cdfb1 100644 --- a/src/kudu/tablet/tablet.h +++ b/src/kudu/tablet/tablet.h @@ -144,10 +144,7 @@ class Tablet { WriteOpState* op_state); // Acquire locks for each of the operations in the given txn. - // - // Note that, if this fails, it's still possible that the op state holds - // _some_ of the locks. In that case, we expect that the op will still clean - // them up when it is aborted (or otherwise destructed). + // This also sets the row op's RowSetKeyProbe. Status AcquireRowLocks(WriteOpState* op_state); // Starts an MVCC op which must have a pre-assigned timestamp. @@ -160,24 +157,6 @@ class Tablet { // don't boot a tablet server. void AssignTimestampAndStartOpForTests(WriteOpState* op_state); - // Insert a new row into the tablet. - // - // The provided 'data' slice should have length equivalent to this - // tablet's Schema.byte_size(). - // - // After insert, the row and any referred-to memory (eg for strings) - // have been copied into internal memory, and thus the provided memory - // buffer may safely be re-used or freed. - // - // Returns Status::AlreadyPresent() if an entry with the same key is already - // present in the tablet. - // Returns Status::OK unless allocation fails. - // - // Acquires the row lock for the given operation, setting it in the - // RowOp struct. This also sets the row op's RowSetKeyProbe. - Status AcquireLockForOp(WriteOpState* op_state, - RowOp* op); - // Signal that the given op is about to Apply. void StartApplying(WriteOpState* op_state);
