This is an automated email from the ASF dual-hosted git repository.
awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 6983d25 KUDU-2612: add ScopedPartitionLock
6983d25 is described below
commit 6983d25c35df8569c1851bf27e2c75e98036c10d
Author: hahao <[email protected]>
AuthorDate: Sat Feb 20 13:03:58 2021 -0800
KUDU-2612: add ScopedPartitionLock
This patch introduces a coarse-grained partition-level lock
ScopedPartitionLock to prevent dirty writes for multi-row transactions,
similar to the ScopedRowLock, but for locking the entire LockManager
instead of individual rows.
A partition lock can only be held by a single transaction at a time. A
given transaction can acquire the lock multiple times. To prevent
deadlocks, a wait-die scheme is used -- if a transaction requires a lock
held by another transaction:
1. Retry the op if the requesting transaction has a lower txn ID than
the current holder ("wait"),
2. Otherwise abort the requesting transaction immediately ("die").
A later patch will plumb this locking into participant ops and
transactional write ops.
Change-Id: I158115739ce3e7cfb77bbcb854e834336c1256b1
Reviewed-on: http://gerrit.cloudera.org:8080/17097
Reviewed-by: Alexey Serbin <[email protected]>
Tested-by: Andrew Wong <[email protected]>
---
src/kudu/tablet/lock_manager-test.cc | 233 ++++++++++++++++++++++++++++++++++-
src/kudu/tablet/lock_manager.cc | 179 ++++++++++++++++++++++++++-
src/kudu/tablet/lock_manager.h | 137 ++++++++++++++++++--
src/kudu/tserver/tserver.proto | 7 ++
4 files changed, 540 insertions(+), 16 deletions(-)
diff --git a/src/kudu/tablet/lock_manager-test.cc
b/src/kudu/tablet/lock_manager-test.cc
index d2d9005..7ff727a 100644
--- a/src/kudu/tablet/lock_manager-test.cc
+++ b/src/kudu/tablet/lock_manager-test.cc
@@ -19,6 +19,7 @@
#include <algorithm>
#include <cstdint>
+#include <cstdlib>
#include <memory>
#include <mutex>
#include <ostream>
@@ -30,14 +31,20 @@
#include <glog/logging.h>
#include <gtest/gtest.h>
+#include "kudu/common/txn_id.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/stringprintf.h"
+#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/array_view.h"
+#include "kudu/util/countdown_latch.h"
#include "kudu/util/env.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/slice.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/test_util.h"
+using kudu::tserver::TabletServerErrorPB;
using std::shared_ptr;
using std::string;
using std::thread;
@@ -53,7 +60,7 @@ class LockEntry;
class OpState;
static const OpState* kFakeTransaction =
- reinterpret_cast<OpState*>(0xdeadbeef);
+ reinterpret_cast<OpState*>(0xdeadbeef);
class LockManagerTest : public KuduTest {
public:
@@ -116,7 +123,7 @@ TEST_F(LockManagerTest, TestRelockSameRow) {
VerifyAlreadyLocked(key_a[0]);
}
-TEST_F(LockManagerTest, TestMoveLock) {
+TEST_F(LockManagerTest, TestMoveRowLock) {
// Acquire a lock.
Slice key_a[] = {"a"};
ScopedRowLock row_lock(&lock_manager_, kFakeTransaction, key_a,
LockManager::LOCK_EXCLUSIVE);
@@ -128,6 +135,228 @@ TEST_F(LockManagerTest, TestMoveLock) {
ASSERT_FALSE(row_lock.acquired()); // NOLINT(bugprone-use-after-move)
}
+TEST_F(LockManagerTest, TestLockUnlockPartitionSingleTxn) {
+ TxnId id(0);
+ TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+ {
+ ScopedPartitionLock l(&lock_manager_, id);
+ ASSERT_TRUE(l.IsAcquired(&code));
+ ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+ ASSERT_EQ(1, lock_manager_.partition_lock_refs());
+ }
+
+ // The same transaction should be able to acquire the partition lock multiple
+ // times.
+ ScopedPartitionLock first_lock(&lock_manager_, id);
+ ASSERT_TRUE(first_lock.IsAcquired(&code));
+ ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+ {
+ ScopedPartitionLock second_lock(&lock_manager_, id);
+ ASSERT_TRUE(second_lock.IsAcquired(&code));
+ ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+ ASSERT_EQ(2, lock_manager_.partition_lock_refs());
+ }
+
+ // 'partition_lock_refs' should decrease automatically when a
+ // ScopedPartitionLock goes out of scope.
+ ASSERT_EQ(1, lock_manager_.partition_lock_refs());
+ ASSERT_TRUE(first_lock.IsAcquired(&code));
+ ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+}
+
+TEST_F(LockManagerTest, TestLockUnlockPartitionAbort) {
+ // Acquiring a lock that is held by another transaction with a lower txn ID
+ // will get a TXN_LOCKED_ABORT server error code.
+ TxnId txn1(1);
+ ScopedPartitionLock first_lock(&lock_manager_, txn1);
+ TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+ ASSERT_TRUE(first_lock.IsAcquired(&code));
+ ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+ ASSERT_EQ(1, lock_manager_.partition_lock_refs());
+
+ TxnId txn2(2);
+ ScopedPartitionLock second_lock(&lock_manager_, txn2);
+ ASSERT_FALSE(second_lock.IsAcquired(&code));
+ ASSERT_EQ(TabletServerErrorPB::TXN_LOCKED_ABORT, code);
+ ASSERT_EQ(1, lock_manager_.partition_lock_refs());
+}
+
+TEST_F(LockManagerTest, TestLockUnlockPartitionRetry) {
+ // Acquiring a lock that is held by another transaction with a higher txn ID
+ // will get a 'TXN_LOCKED_RETRY' server error code.
+ TxnId txn2(2);
+ ScopedPartitionLock first_lock(&lock_manager_, txn2);
+ TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+ ASSERT_TRUE(first_lock.IsAcquired(&code));
+ ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+ ASSERT_EQ(1, lock_manager_.partition_lock_refs());
+
+ TxnId txn1(1);
+ ScopedPartitionLock second_lock(&lock_manager_, txn1);
+ ASSERT_FALSE(second_lock.IsAcquired(&code));
+ ASSERT_EQ(TabletServerErrorPB::TXN_LOCKED_RETRY_OP, code);
+ ASSERT_EQ(1, lock_manager_.partition_lock_refs());
+}
+
+TEST_F(LockManagerTest, TestSerialWaitForLockPartition) {
+ // Run serially in an order that we know for sure the lock can be taken. And
+ // it is safe to use 'WAIT_FOR_LOCK' lock mode.
+ constexpr const int kNumTxn = 5;
+ for (int i = 0; i < kNumTxn; i++) {
+ TxnId id(i);
+ ScopedPartitionLock l(&lock_manager_, id, LockManager::WAIT_FOR_LOCK);
+ TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+ CHECK(l.IsAcquired(&code));
+ CHECK_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+ CHECK_EQ(1, lock_manager_.partition_lock_refs());
+ }
+}
+
+TEST_F(LockManagerTest, TestConcurrentWaitForLockPartition) {
+ // The txn ID doesn't matter much for WAIT_FOR_LOCK, since it's assumed that
+ // callers have already ensured that the lock acquisition order is valid
+ // (e.g. by using TRY_LOCK mode on participant leaders).
+ constexpr const int kMaxTxnId = 5;
+ ScopedPartitionLock l(&lock_manager_, rand() % kMaxTxnId,
LockManager::WAIT_FOR_LOCK);
+ TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+ ASSERT_TRUE(l.IsAcquired(&code));
+ ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+ ASSERT_EQ(1, lock_manager_.partition_lock_refs());
+ thread t([&] {
+ ScopedPartitionLock l(&lock_manager_, rand() % kMaxTxnId,
LockManager::WAIT_FOR_LOCK);
+ TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+ CHECK(l.IsAcquired(&code));
+ CHECK_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+ CHECK_EQ(1, lock_manager_.partition_lock_refs());
+ });
+ SCOPED_CLEANUP({ t.join(); });
+ SleepFor(MonoDelta::FromSeconds(3));
+ ASSERT_EQ(1, lock_manager_.partition_lock_refs());
+ l.Release();
+}
+
+TEST_F(LockManagerTest, TestWaitDieDeadlockPrevention) {
+ LockManager other_lock_manager;
+ vector<thread> threads;
+ constexpr const int kNumTxns = 10;
+ CountDownLatch latch(kNumTxns);
+ for (int t = 0; t < kNumTxns; t++) {
+ threads.emplace_back([&, t] {
+ latch.CountDown();
+ TxnId id(t);
+ ScopedPartitionLock locks[2];
+ // Functor that acquires the given lock manager following the wait-die
+ // scheme.
+ const auto lock_txn = [&] (LockManager* lm, int lock_idx) {
+ bool acquired = false;
+ while (true) {
+ locks[lock_idx] = ScopedPartitionLock(lm, id);
+ TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+ acquired = locks[lock_idx].IsAcquired(&code);
+ if (acquired) {
+ LOG(INFO) << "Acquired lock from txn " << t;
+ CHECK_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+ CHECK_GE(lm->partition_lock_refs(), 1);
+ break;
+ } else if (code == TabletServerErrorPB::TXN_LOCKED_ABORT) {
+ LOG(INFO) << "Aborting txn " << t;
+ break;
+ } else if (code == TabletServerErrorPB::TXN_LOCKED_RETRY_OP) {
+ LOG(INFO) << "Retrying txn " << t;
+ continue;
+ } else {
+ FAIL() << "Unexpected code " << code;
+ }
+ }
+ };
+ // Have each transaction attempt to lock one lock manager and then the
+ // other, at random. With a correct wait-die scheme, this should not get
+ // stuck in a deadlock.
+ // NOTE: this simulates multiple transactions attempting to lock multiple
+ // participants at once.
+ LockManager* lm1 = rand() % 2 ? &lock_manager_ : &other_lock_manager;
+ LockManager* lm2 = lm1 == &lock_manager_ ? &other_lock_manager :
&lock_manager_;
+ lock_txn(lm1, 0);
+ lock_txn(lm2, 1);
+ });
+ }
+ std::for_each(threads.begin(), threads.end(), [&] (thread& t) { t.join(); });
+ ASSERT_EQ(0, lock_manager_.partition_lock_refs());
+}
+
+TEST_F(LockManagerTest, TestConcurrentLockUnlockPartitionWithInvalidID) {
+ vector<thread> threads;
+ constexpr const int kNumStartAtOnce = 5;
+ CountDownLatch latch(kNumStartAtOnce);
+ for (int t = 0; t < kNumStartAtOnce - 1; t++) {
+ threads.emplace_back([&] {
+ latch.CountDown();
+ TxnId id(1);
+ ScopedPartitionLock l(&lock_manager_, id);
+ TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+ bool acquired = l.IsAcquired(&code);
+ // Lock may not acquired if non-transactional op take the lock
+ // first.
+ if (acquired) {
+ CHECK_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+ CHECK_GE(lock_manager_.partition_lock_refs(), 1);
+ }
+ });
+ }
+
+ // Actors with an invalid ID (i.e. non-transactional ops) can also acquire
+ // the lock.
+ threads.emplace_back([&] {
+ latch.CountDown();
+ TxnId id(TxnId::kInvalidTxnId);
+ ScopedPartitionLock l(&lock_manager_, id);
+ TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+ bool acquired = l.IsAcquired(&code);
+ if (acquired) {
+ CHECK_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+ CHECK_EQ(1, lock_manager_.partition_lock_refs());
+ }
+ });
+
+ std::for_each(threads.begin(), threads.end(), [&] (thread& t) { t.join(); });
+ ASSERT_EQ(0, lock_manager_.partition_lock_refs());
+}
+
+TEST_F(LockManagerTest, TestMovePartitionLock) {
+ {
+ // Acquire a lock.
+ TxnId id(0);
+ ScopedPartitionLock partition_lock(&lock_manager_, id);
+ TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+ ASSERT_TRUE(partition_lock.IsAcquired(&code));
+ ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+ ASSERT_EQ(1, lock_manager_.partition_lock_refs());
+
+ // Move it to a new instance.
+ ScopedPartitionLock moved_lock(std::move(partition_lock));
+ ASSERT_TRUE(moved_lock.IsAcquired(&code));
+ ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+ ASSERT_FALSE(partition_lock.IsAcquired(&code)); //
NOLINT(bugprone-use-after-move)
+ ASSERT_EQ(1, lock_manager_.partition_lock_refs());
+ }
+
+ {
+ // Acquire an empty lock.
+ ScopedPartitionLock partition_lock;
+ TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+ ASSERT_FALSE(partition_lock.IsAcquired(&code));
+ ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+ ASSERT_EQ(0, lock_manager_.partition_lock_refs());
+
+ // Move it to a new instance.
+ ScopedPartitionLock moved_lock(std::move(partition_lock));
+ ASSERT_FALSE(moved_lock.IsAcquired(&code));
+ ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+ ASSERT_FALSE(partition_lock.IsAcquired(&code)); //
NOLINT(bugprone-use-after-move)
+ ASSERT_EQ(0, lock_manager_.partition_lock_refs());
+ }
+}
+
class LmTestResource {
public:
explicit LmTestResource(Slice id) : id_(id), owner_(0), is_owned_(false) {}
diff --git a/src/kudu/tablet/lock_manager.cc b/src/kudu/tablet/lock_manager.cc
index 01667fe..bf9d57f 100644
--- a/src/kudu/tablet/lock_manager.cc
+++ b/src/kudu/tablet/lock_manager.cc
@@ -18,7 +18,7 @@
#include "kudu/tablet/lock_manager.h"
#include <cstddef>
-#include <cstdint>
+#include <limits>
#include <memory>
#include <mutex>
#include <ostream>
@@ -28,10 +28,13 @@
#include <glog/logging.h>
+#include "kudu/common/txn_id.h"
#include "kudu/gutil/dynamic_annotations.h"
#include "kudu/gutil/hash/city.h"
#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/walltime.h"
+#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/array_view.h"
#include "kudu/util/faststring.h"
#include "kudu/util/locks.h"
@@ -40,10 +43,12 @@
#include "kudu/util/semaphore.h"
#include "kudu/util/trace.h"
+using kudu::tserver::TabletServerErrorPB;
using std::string;
using std::unique_lock;
using std::unique_ptr;
using std::vector;
+using strings::Substitute;
namespace kudu {
namespace tablet {
@@ -356,17 +361,177 @@ void ScopedRowLock::Release() {
}
// ============================================================================
+// ScopedPartitionLock
+// ============================================================================
+
+// A coarse grained partition level lock to prevent concurrent transactions to
+// a given tablet. Each lock is associated with a single transaction at a time.
+class PartitionLockState {
+ public:
+ explicit PartitionLockState(const TxnId& txn_id)
+ : txn_id_(txn_id) {}
+ TxnId txn_id() const {
+ return txn_id_;
+ }
+ private:
+ // The transaction ID that holds the partition lock.
+ const TxnId txn_id_;
+};
+
+ScopedPartitionLock::ScopedPartitionLock(LockManager* manager,
+ const TxnId& txn_id,
+ LockManager::LockWaitMode wait_mode)
+ : manager_(DCHECK_NOTNULL(manager)),
+ code_(TabletServerErrorPB::UNKNOWN_ERROR) {
+ switch (wait_mode) {
+ case LockManager::TRY_LOCK:
+ lock_state_ = manager_->TryAcquirePartitionLock(txn_id, &code_,
MonoDelta::FromSeconds(1));
+ break;
+ case LockManager::WAIT_FOR_LOCK:
+ lock_state_ = manager_->WaitUntilAcquiredPartitionLock(txn_id);
+ DCHECK(lock_state_);
+ break;
+ default:
+ LOG(DFATAL) << "not reachable";
+ break;
+ }
+}
+
+ScopedPartitionLock::~ScopedPartitionLock() {
+ if (manager_) {
+ Release();
+ }
+}
+
+bool ScopedPartitionLock::IsAcquired(tserver::TabletServerErrorPB::Code* code)
const {
+ if (lock_state_) {
+ return true;
+ }
+ *code = code_;
+ return false;
+}
+
+void ScopedPartitionLock::Release() {
+ // Already released.
+ if (!lock_state_) {
+ return;
+ }
+ manager_->ReleasePartitionLock();
+ lock_state_ = nullptr;
+}
+
+ScopedPartitionLock::ScopedPartitionLock(ScopedPartitionLock&& other) noexcept
{
+ TakeState(&other);
+}
+
+ScopedPartitionLock& ScopedPartitionLock::operator=(ScopedPartitionLock&&
other) noexcept {
+ TakeState(&other);
+ return *this;
+}
+
+void ScopedPartitionLock::TakeState(ScopedPartitionLock* other) {
+ DCHECK(other != this);
+ manager_ = other->manager_;
+ lock_state_ = other->lock_state_;
+ code_ = other->code_;
+ other->lock_state_ = nullptr;
+}
+
+// ============================================================================
// LockManager
// ============================================================================
LockManager::LockManager()
- : locks_(new LockTable()) {
+ : partition_sem_(1),
+ partition_lock_refs_(0),
+ locks_(new LockTable) {
}
LockManager::~LockManager() {
delete locks_;
}
+PartitionLockState* LockManager::TryAcquirePartitionLock(
+ const TxnId& txn_id,
+ TabletServerErrorPB::Code* code,
+ const MonoDelta& timeout) {
+ // Favor transactional ops over non-transactional ones by giving a
+ // non-transactional ops the maximum txn ID. We favor transactional ops here
+ // because aborting and retrying a transaction likely entails retrying
+ // several ops.
+ //
+ // TODO(hao): this may result in lock starvation for non-transactional ops.
+ // We should evaluate strategies to avoid this.
+ const auto requested_id = txn_id.IsValid() ?
+ txn_id.value() : std::numeric_limits<int64_t>::max();
+
+ // The most anticipated case is the lock is being re-acquired multiple times.
+ {
+ std::lock_guard<simple_spinlock> l(p_lock_);
+ if (partition_lock_ &&
+ PREDICT_TRUE(requested_id == partition_lock_->txn_id().value())) {
+ DCHECK_GT(partition_lock_refs_, 0);
+ DCHECK_GE(0, partition_sem_.GetValue());
+ partition_lock_refs_ += 1;
+ return partition_lock_.get();
+ }
+ }
+
+ // We expect low contention, so use TryAcquire first so we don't have to do a
+ // syscall to get the current time.
+ if (!partition_sem_.TryAcquire()) {
+ const MonoTime start(MonoTime::Now());
+ while (!partition_sem_.TimedAcquire(MonoDelta::FromMilliseconds(250))) {
+ bool has_timeout = timeout.Initialized();
+ MonoDelta elapsed;
+ if (has_timeout) {
+ elapsed = MonoTime::Now() - start;
+ }
+ if (has_timeout && elapsed > timeout) {
+ LOG(WARNING) << Substitute("Txn $0 has not acquired the partition lock
after $1ms",
+ requested_id, elapsed.ToMilliseconds());
+ // If we're still unable to take 'partition_sem_', but
'partition_lock_'
+ // is unset, just try again -- another thread is likely in the midsts
of
+ // unsetting it and 'partition_sem_' should be available soon.
+ std::lock_guard<simple_spinlock> l(p_lock_);
+ if (!partition_lock_) {
+ continue;
+ }
+ // If the requestor requires a lock held by another transaction. Abort
+ // the requested transaction immediately if it has a higher txn ID than
+ // the transaction holding the lock. Otherwise, let the requestor
retry.
+ //
+ // TODO(hao): generalize deadlock prevention scheme when adding new
+ // scheme or lock type.
+ *code = requested_id > partition_lock_->txn_id().value() ?
+ TabletServerErrorPB::TXN_LOCKED_ABORT :
TabletServerErrorPB::TXN_LOCKED_RETRY_OP;
+ return nullptr;
+ }
+ }
+ }
+ std::lock_guard<simple_spinlock> l(p_lock_);
+ DCHECK_GE(0, partition_sem_.GetValue());
+ DCHECK(!partition_lock_);
+ DCHECK_EQ(partition_lock_refs_, 0);
+ // No one is holding the lock -- take it now.
+ partition_lock_.reset(new PartitionLockState(requested_id));
+ partition_lock_refs_ = 1;
+ return partition_lock_.get();
+}
+
+PartitionLockState* LockManager::WaitUntilAcquiredPartitionLock(const TxnId&
txn_id) {
+ MicrosecondsInt64 start_wait_us = GetMonoTimeMicros();
+ TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+ PartitionLockState* lock = TryAcquirePartitionLock(txn_id, &code);
+ CHECK(lock);
+ MicrosecondsInt64 wait_us = GetMonoTimeMicros() - start_wait_us;
+ TRACE_COUNTER_INCREMENT("partition_lock_wait_us", wait_us);
+ if (wait_us > 100 * 1000) {
+ TRACE("Waited $0us to acquire the partition lock", wait_us);
+ }
+ return lock;
+}
+
std::vector<LockEntry*> LockManager::LockBatch(ArrayView<Slice> keys, const
OpState* op) {
vector<LockEntry*> entries = locks_->GetLockEntries(keys);
@@ -378,6 +543,15 @@ std::vector<LockEntry*>
LockManager::LockBatch(ArrayView<Slice> keys, const OpSt
void LockManager::ReleaseBatch(ArrayView<LockEntry*> locks) {
locks_->ReleaseLockEntries(locks); }
+void LockManager::ReleasePartitionLock() {
+ std::lock_guard<simple_spinlock> l(p_lock_);
+ DCHECK_GT(partition_lock_refs_, 0);
+ if (--partition_lock_refs_ == 0) {
+ partition_sem_.unlock();
+ partition_lock_.reset();
+ }
+}
+
void LockManager::AcquireLockOnEntry(LockEntry* entry, const OpState* op) {
// We expect low contention, so just try to try_lock first. This is faster
// than a timed_lock, since we don't have to do a syscall to get the current
@@ -386,7 +560,6 @@ void LockManager::AcquireLockOnEntry(LockEntry* entry,
const OpState* op) {
// If the current holder of this lock is the same op just increment
// the recursion count without acquiring the mutex.
//
- //
// NOTE: This is not a problem for the current way locks are managed since
// they are obtained and released in bulk (all locks for an op are
// obtained and released at the same time). If at any time in the future
diff --git a/src/kudu/tablet/lock_manager.h b/src/kudu/tablet/lock_manager.h
index 3f761a4..e7e6a75 100644
--- a/src/kudu/tablet/lock_manager.h
+++ b/src/kudu/tablet/lock_manager.h
@@ -14,32 +14,43 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-#ifndef KUDU_TABLET_LOCK_MANAGER_H
-#define KUDU_TABLET_LOCK_MANAGER_H
+#pragma once
+#include <cstdint>
+#include <memory>
#include <vector>
+#include "kudu/common/txn_id.h"
#include "kudu/gutil/macros.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/semaphore.h"
#include "kudu/util/slice.h"
namespace kudu {
+
template <typename T>
class ArrayView;
-} // namespace kudu
-namespace kudu { namespace tablet {
+namespace tablet {
class LockEntry;
class LockTable;
class OpState;
+class PartitionLockState;
-// Super-simple lock manager implementation. This only supports exclusive
-// locks, and makes no attempt to prevent deadlocks if a single thread
-// takes multiple locks.
+// Lock manager implementation that only supports exclusive locks. It is
+// composed of two types of lock: 'ScopedRowLock' for single-row lock and
+// 'ScopedPartitionLock' for partition lock.
//
-// In the future when we want to support multi-row transactions of some kind
-// we'll have to implement a proper lock manager with all its trappings,
-// but this should be enough for the single-row use case.
+// For deadlock prevention of multi-row transactions, a wait-die scheme is
+// used. When a transaction B attempts to take a lock that's already held by
+// transaction A:
+// - B > A: transaction B should be aborted, and applications should retry
+// B at later time, signified by the returning of TXN_LOCKED_ABORT.
+// - B < A: transaction B should wait for the lock to be taken by retrying the
+// transactional op, signified by the returning of TXN_LOCKED_RETRY_OP.
class LockManager {
public:
LockManager();
@@ -49,7 +60,21 @@ class LockManager {
LOCK_EXCLUSIVE
};
+ enum LockWaitMode {
+ // The attempt to take the lock must wait until the lock is taken. This is
+ // only appropriate if it is guaranteed that the lock can be waited on
+ // without a deadlock.
+ WAIT_FOR_LOCK,
+
+ // Try to acquire the lock with a time out, if not available return without
+ // acquiring it.
+ TRY_LOCK,
+ };
+
+ int64_t partition_lock_refs() const { return partition_lock_refs_; }
+
private:
+ friend class ScopedPartitionLock;
friend class ScopedRowLock;
friend class LockManagerTest;
@@ -59,8 +84,48 @@ class LockManager {
void Release(LockEntry* lock);
void ReleaseBatch(ArrayView<LockEntry*> locks);
+ // Tries to acquire the partition lock with the given txn ID with the given
+ // timeout, or tries indefinitely if no timeout is set. A partition lock can
+ // only be held by a single transaction at a time; the same transaction can
+ // acquire the lock multiple times. Both transactional and non-transactional
+ // ops must try to acquire the lock (non-transactional ops are signified with
+ // an invalid 'txn_id').
+ //
+ // If the attempt to lock fails, an appropriate error code is returned based
+ // on the transaction ID and the deadlock prevention policy described above.
+ PartitionLockState* TryAcquirePartitionLock(const TxnId& txn_id,
+
tserver::TabletServerErrorPB::Code* code,
+ const MonoDelta& timeout =
MonoDelta());
+
+ // Similar to the above, but waits until the lock is acquired.
+ //
+ // Note that the caller is expected to ensure there is no deadlock. For
+ // example, when running on followers in the prepare phase, or running
+ // serially in an order that has already been successful with
+ // TryAcquirePartitionLock() calls.
+ PartitionLockState* WaitUntilAcquiredPartitionLock(const TxnId& txn_id);
+ void ReleasePartitionLock();
+
static void AcquireLockOnEntry(LockEntry* e, const OpState* op);
+ // Semaphore used by the LockManager to signal the release of the partition
+ // lock. If its value is >= 0, the partition lock is already held, and
+ // callers can use TimedAcquire() to wait for it to be released.
+ Semaphore partition_sem_;
+
+ // Lock to protect 'partition_lock_' and 'partition_lock_refs_'.
+ simple_spinlock p_lock_;
+
+ // If 'partition_lock_' has been held by a transaction,
+ // 'partition_lock_refs_' keeps track of the number of times that the
+ // partition lock has been held by that transaction.
+ //
+ // NOTE: 'partition_lock_' is only set to non-null by a single thread (i.e.
+ // the prepare thread), but it may be released from a different thread (e.g.
+ // an apply thread).
+ int64_t partition_lock_refs_;
+ std::unique_ptr<PartitionLockState> partition_lock_;
+
LockTable *locks_;
DISALLOW_COPY_AND_ASSIGN(LockManager);
@@ -120,6 +185,56 @@ class ScopedRowLock {
std::vector<LockEntry*> entries_;
};
+// Similar to ScopedRowLock, hold a lock on a partition, for the scope of
+// this object. Usage:
+// {
+// ScopedPartitionLock(&manager, txn_id);
+// .. do stuff ..
+// }
+// // lock is released when the object exits its scope.
+class ScopedPartitionLock {
+ public:
+ // Construct an initially-unlocked lock holder.
+ // You can later assign this to actually hold a lock using
+ // the move-constructor:
+ // ScopedPartitionLock l;
+ // l = ScopedPartitionLock(...);
+ // or
+ // l = std::move(other_partition_lock);
+ ScopedPartitionLock()
+ : code_(tserver::TabletServerErrorPB::UNKNOWN_ERROR) {}
+
+ // 'wait_mode' indicates whether or not to wait until
+ // the lock is acquired.
+ ScopedPartitionLock(LockManager* manager,
+ const TxnId& txn_id,
+ LockManager::LockWaitMode wait_mode = LockManager::TRY_LOCK);
+ ~ScopedPartitionLock();
+
+ // Move constructor and assignment operator.
+ ScopedPartitionLock(ScopedPartitionLock&& other) noexcept;
+ ScopedPartitionLock& operator=(ScopedPartitionLock&& other) noexcept;
+
+ // Disable the copy constructor.
+ ScopedPartitionLock(const ScopedPartitionLock&) = delete;
+
+ // Check whether the partition lock is acquired by the transaction.
+ // If false, set the tablet server error code accordingly to abort
+ // or retry the transaction. Otherwise, no error code is set.
+ bool IsAcquired(tserver::TabletServerErrorPB::Code* code) const;
+
+ // Release a reference of the partition lock held by the transaction.
+ void Release();
+
+ private:
+ void TakeState(ScopedPartitionLock* other);
+
+ LockManager* manager_ = nullptr;
+ PartitionLockState* lock_state_ = nullptr;
+ // The tablet server error code is only set when the lock
+ // is not acquired. Otherwise, the default is 'UNKNOWN_ERROR'.
+ tserver::TabletServerErrorPB::Code code_;
+};
+
} // namespace tablet
} // namespace kudu
-#endif
diff --git a/src/kudu/tserver/tserver.proto b/src/kudu/tserver/tserver.proto
index 4bd6c6b..9721bf8 100644
--- a/src/kudu/tserver/tserver.proto
+++ b/src/kudu/tserver/tserver.proto
@@ -109,6 +109,13 @@ message TabletServerErrorPB {
// The requested transaction participant op was already applied.
TXN_OP_ALREADY_APPLIED = 23;
+
+ // The requested transaction needs to be aborted for deadlock prevention.
+ TXN_LOCKED_ABORT = 24;
+
+ // The requested transaction participant op or write op needs to be
+ // retried, because the required lock is held by another transaction.
+ TXN_LOCKED_RETRY_OP = 25;
}
// The error code.