This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 6983d25  KUDU-2612: add ScopedPartitionLock
6983d25 is described below

commit 6983d25c35df8569c1851bf27e2c75e98036c10d
Author: hahao <[email protected]>
AuthorDate: Sat Feb 20 13:03:58 2021 -0800

    KUDU-2612: add ScopedPartitionLock
    
    This patch introduces a coarse-grained partition-level lock
    ScopedPartitionLock to prevent dirty writes for multi-row transactions,
    similar to the ScopedRowLock, but for locking the entire LockManager
    instead of individual rows.
    
    A partition lock can only be held by a single transaction at a time. A
    given transaction can acquire the lock multiple times. To prevent
    deadlocks, a wait-die scheme is used -- if a transaction requires a lock
    held by another transaction:
    1. Retry the op if the requesting transaction has a lower txn ID than
       the current holder ("wait"),
    2. Otherwise abort the requesting transaction immediately ("die").
    
    A later patch will plumb this locking into participant ops and
    transactional write ops.
    
    Change-Id: I158115739ce3e7cfb77bbcb854e834336c1256b1
    Reviewed-on: http://gerrit.cloudera.org:8080/17097
    Reviewed-by: Alexey Serbin <[email protected]>
    Tested-by: Andrew Wong <[email protected]>
---
 src/kudu/tablet/lock_manager-test.cc | 233 ++++++++++++++++++++++++++++++++++-
 src/kudu/tablet/lock_manager.cc      | 179 ++++++++++++++++++++++++++-
 src/kudu/tablet/lock_manager.h       | 137 ++++++++++++++++++--
 src/kudu/tserver/tserver.proto       |   7 ++
 4 files changed, 540 insertions(+), 16 deletions(-)

diff --git a/src/kudu/tablet/lock_manager-test.cc 
b/src/kudu/tablet/lock_manager-test.cc
index d2d9005..7ff727a 100644
--- a/src/kudu/tablet/lock_manager-test.cc
+++ b/src/kudu/tablet/lock_manager-test.cc
@@ -19,6 +19,7 @@
 
 #include <algorithm>
 #include <cstdint>
+#include <cstdlib>
 #include <memory>
 #include <mutex>
 #include <ostream>
@@ -30,14 +31,20 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include "kudu/common/txn_id.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/stringprintf.h"
+#include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/array_view.h"
+#include "kudu/util/countdown_latch.h"
 #include "kudu/util/env.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/test_util.h"
 
+using kudu::tserver::TabletServerErrorPB;
 using std::shared_ptr;
 using std::string;
 using std::thread;
@@ -53,7 +60,7 @@ class LockEntry;
 class OpState;
 
 static const OpState* kFakeTransaction =
-  reinterpret_cast<OpState*>(0xdeadbeef);
+    reinterpret_cast<OpState*>(0xdeadbeef);
 
 class LockManagerTest : public KuduTest {
  public:
@@ -116,7 +123,7 @@ TEST_F(LockManagerTest, TestRelockSameRow) {
   VerifyAlreadyLocked(key_a[0]);
 }
 
-TEST_F(LockManagerTest, TestMoveLock) {
+TEST_F(LockManagerTest, TestMoveRowLock) {
   // Acquire a lock.
   Slice key_a[] = {"a"};
   ScopedRowLock row_lock(&lock_manager_, kFakeTransaction, key_a, 
LockManager::LOCK_EXCLUSIVE);
@@ -128,6 +135,228 @@ TEST_F(LockManagerTest, TestMoveLock) {
   ASSERT_FALSE(row_lock.acquired()); // NOLINT(bugprone-use-after-move)
 }
 
+TEST_F(LockManagerTest, TestLockUnlockPartitionSingleTxn) {
+  TxnId id(0);
+  TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+  {
+    ScopedPartitionLock l(&lock_manager_, id);
+    ASSERT_TRUE(l.IsAcquired(&code));
+    ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+    ASSERT_EQ(1, lock_manager_.partition_lock_refs());
+  }
+
+  // The same transaction should be able to acquire the partition lock multiple
+  // times.
+  ScopedPartitionLock first_lock(&lock_manager_, id);
+  ASSERT_TRUE(first_lock.IsAcquired(&code));
+  ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+  {
+    ScopedPartitionLock second_lock(&lock_manager_, id);
+    ASSERT_TRUE(second_lock.IsAcquired(&code));
+    ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+    ASSERT_EQ(2, lock_manager_.partition_lock_refs());
+  }
+
+  // 'partition_lock_refs' should decrease automatically when a
+  // ScopedPartitionLock goes out of scope.
+  ASSERT_EQ(1, lock_manager_.partition_lock_refs());
+  ASSERT_TRUE(first_lock.IsAcquired(&code));
+  ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+}
+
+TEST_F(LockManagerTest, TestLockUnlockPartitionAbort) {
+  // Acquiring a lock that is held by another transaction with a lower txn ID
+  // will get a TXN_LOCKED_ABORT server error code.
+  TxnId txn1(1);
+  ScopedPartitionLock first_lock(&lock_manager_, txn1);
+  TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+  ASSERT_TRUE(first_lock.IsAcquired(&code));
+  ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+  ASSERT_EQ(1, lock_manager_.partition_lock_refs());
+
+  TxnId txn2(2);
+  ScopedPartitionLock second_lock(&lock_manager_, txn2);
+  ASSERT_FALSE(second_lock.IsAcquired(&code));
+  ASSERT_EQ(TabletServerErrorPB::TXN_LOCKED_ABORT, code);
+  ASSERT_EQ(1, lock_manager_.partition_lock_refs());
+}
+
+TEST_F(LockManagerTest, TestLockUnlockPartitionRetry) {
+  // Acquiring a lock that is held by another transaction with a higher txn ID
+  // will get a 'TXN_LOCKED_RETRY' server error code.
+  TxnId txn2(2);
+  ScopedPartitionLock first_lock(&lock_manager_, txn2);
+  TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+  ASSERT_TRUE(first_lock.IsAcquired(&code));
+  ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+  ASSERT_EQ(1, lock_manager_.partition_lock_refs());
+
+  TxnId txn1(1);
+  ScopedPartitionLock second_lock(&lock_manager_, txn1);
+  ASSERT_FALSE(second_lock.IsAcquired(&code));
+  ASSERT_EQ(TabletServerErrorPB::TXN_LOCKED_RETRY_OP, code);
+  ASSERT_EQ(1, lock_manager_.partition_lock_refs());
+}
+
+TEST_F(LockManagerTest, TestSerialWaitForLockPartition) {
+  // Run serially in an order that we know for sure the lock can be taken.  And
+  // it is safe to use 'WAIT_FOR_LOCK' lock mode.
+  constexpr const int kNumTxn = 5;
+  for (int i = 0; i < kNumTxn; i++) {
+    TxnId id(i);
+    ScopedPartitionLock l(&lock_manager_, id, LockManager::WAIT_FOR_LOCK);
+    TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+    CHECK(l.IsAcquired(&code));
+    CHECK_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+    CHECK_EQ(1, lock_manager_.partition_lock_refs());
+  }
+}
+
+TEST_F(LockManagerTest, TestConcurrentWaitForLockPartition) {
+  // The txn ID doesn't matter much for WAIT_FOR_LOCK, since it's assumed that
+  // callers have already ensured that the lock acquisition order is valid
+  // (e.g. by using TRY_LOCK mode on participant leaders).
+  constexpr const int kMaxTxnId = 5;
+  ScopedPartitionLock l(&lock_manager_, rand() % kMaxTxnId, 
LockManager::WAIT_FOR_LOCK);
+  TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+  ASSERT_TRUE(l.IsAcquired(&code));
+  ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+  ASSERT_EQ(1, lock_manager_.partition_lock_refs());
+  thread t([&] {
+    ScopedPartitionLock l(&lock_manager_, rand() % kMaxTxnId, 
LockManager::WAIT_FOR_LOCK);
+    TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+    CHECK(l.IsAcquired(&code));
+    CHECK_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+    CHECK_EQ(1, lock_manager_.partition_lock_refs());
+  });
+  SCOPED_CLEANUP({ t.join(); });
+  SleepFor(MonoDelta::FromSeconds(3));
+  ASSERT_EQ(1, lock_manager_.partition_lock_refs());
+  l.Release();
+}
+
+TEST_F(LockManagerTest, TestWaitDieDeadlockPrevention) {
+  LockManager other_lock_manager;
+  vector<thread> threads;
+  constexpr const int kNumTxns = 10;
+  CountDownLatch latch(kNumTxns);
+  for (int t = 0; t < kNumTxns; t++) {
+    threads.emplace_back([&, t] {
+      latch.CountDown();
+      TxnId id(t);
+      ScopedPartitionLock locks[2];
+      // Functor that acquires the given lock manager following the wait-die
+      // scheme.
+      const auto lock_txn = [&] (LockManager* lm, int lock_idx) {
+        bool acquired = false;
+        while (true) {
+          locks[lock_idx] = ScopedPartitionLock(lm, id);
+          TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+          acquired = locks[lock_idx].IsAcquired(&code);
+          if (acquired) {
+            LOG(INFO) << "Acquired lock from txn " << t;
+            CHECK_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+            CHECK_GE(lm->partition_lock_refs(), 1);
+            break;
+          } else if (code == TabletServerErrorPB::TXN_LOCKED_ABORT) {
+            LOG(INFO) << "Aborting txn " << t;
+            break;
+          } else if (code == TabletServerErrorPB::TXN_LOCKED_RETRY_OP) {
+            LOG(INFO) << "Retrying txn " << t;
+            continue;
+          } else {
+            FAIL() << "Unexpected code " << code;
+          }
+        }
+      };
+      // Have each transaction attempt to lock one lock manager and then the
+      // other, at random. With a correct wait-die scheme, this should not get
+      // stuck in a deadlock.
+      // NOTE: this simulates multiple transactions attempting to lock multiple
+      // participants at once.
+      LockManager* lm1 = rand() % 2 ? &lock_manager_ : &other_lock_manager;
+      LockManager* lm2 = lm1 == &lock_manager_ ? &other_lock_manager : 
&lock_manager_;
+      lock_txn(lm1, 0);
+      lock_txn(lm2, 1);
+    });
+  }
+  std::for_each(threads.begin(), threads.end(), [&] (thread& t) { t.join(); });
+  ASSERT_EQ(0, lock_manager_.partition_lock_refs());
+}
+
+TEST_F(LockManagerTest, TestConcurrentLockUnlockPartitionWithInvalidID) {
+  vector<thread> threads;
+  constexpr const int kNumStartAtOnce = 5;
+  CountDownLatch latch(kNumStartAtOnce);
+  for (int t = 0; t < kNumStartAtOnce - 1; t++) {
+    threads.emplace_back([&] {
+      latch.CountDown();
+      TxnId id(1);
+      ScopedPartitionLock l(&lock_manager_, id);
+      TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+      bool acquired = l.IsAcquired(&code);
+      // Lock may not acquired if non-transactional op take the lock
+      // first.
+      if (acquired) {
+        CHECK_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+        CHECK_GE(lock_manager_.partition_lock_refs(), 1);
+      }
+    });
+  }
+
+  // Actors with an invalid ID (i.e. non-transactional ops) can also acquire
+  // the lock.
+  threads.emplace_back([&] {
+    latch.CountDown();
+    TxnId id(TxnId::kInvalidTxnId);
+    ScopedPartitionLock l(&lock_manager_, id);
+    TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+    bool acquired = l.IsAcquired(&code);
+    if (acquired) {
+      CHECK_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+      CHECK_EQ(1, lock_manager_.partition_lock_refs());
+    }
+  });
+
+  std::for_each(threads.begin(), threads.end(), [&] (thread& t) { t.join(); });
+  ASSERT_EQ(0, lock_manager_.partition_lock_refs());
+}
+
+TEST_F(LockManagerTest, TestMovePartitionLock) {
+  {
+    // Acquire a lock.
+    TxnId id(0);
+    ScopedPartitionLock partition_lock(&lock_manager_, id);
+    TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+    ASSERT_TRUE(partition_lock.IsAcquired(&code));
+    ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+    ASSERT_EQ(1, lock_manager_.partition_lock_refs());
+
+    // Move it to a new instance.
+    ScopedPartitionLock moved_lock(std::move(partition_lock));
+    ASSERT_TRUE(moved_lock.IsAcquired(&code));
+    ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+    ASSERT_FALSE(partition_lock.IsAcquired(&code)); // 
NOLINT(bugprone-use-after-move)
+    ASSERT_EQ(1, lock_manager_.partition_lock_refs());
+  }
+
+  {
+    // Acquire an empty lock.
+    ScopedPartitionLock partition_lock;
+    TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+    ASSERT_FALSE(partition_lock.IsAcquired(&code));
+    ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+    ASSERT_EQ(0, lock_manager_.partition_lock_refs());
+
+    // Move it to a new instance.
+    ScopedPartitionLock moved_lock(std::move(partition_lock));
+    ASSERT_FALSE(moved_lock.IsAcquired(&code));
+    ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+    ASSERT_FALSE(partition_lock.IsAcquired(&code));  // 
NOLINT(bugprone-use-after-move)
+    ASSERT_EQ(0, lock_manager_.partition_lock_refs());
+  }
+}
+
 class LmTestResource {
  public:
   explicit LmTestResource(Slice id) : id_(id), owner_(0), is_owned_(false) {}
diff --git a/src/kudu/tablet/lock_manager.cc b/src/kudu/tablet/lock_manager.cc
index 01667fe..bf9d57f 100644
--- a/src/kudu/tablet/lock_manager.cc
+++ b/src/kudu/tablet/lock_manager.cc
@@ -18,7 +18,7 @@
 #include "kudu/tablet/lock_manager.h"
 
 #include <cstddef>
-#include <cstdint>
+#include <limits>
 #include <memory>
 #include <mutex>
 #include <ostream>
@@ -28,10 +28,13 @@
 
 #include <glog/logging.h>
 
+#include "kudu/common/txn_id.h"
 #include "kudu/gutil/dynamic_annotations.h"
 #include "kudu/gutil/hash/city.h"
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/walltime.h"
+#include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/array_view.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/locks.h"
@@ -40,10 +43,12 @@
 #include "kudu/util/semaphore.h"
 #include "kudu/util/trace.h"
 
+using kudu::tserver::TabletServerErrorPB;
 using std::string;
 using std::unique_lock;
 using std::unique_ptr;
 using std::vector;
+using strings::Substitute;
 
 namespace kudu {
 namespace tablet {
@@ -356,17 +361,177 @@ void ScopedRowLock::Release() {
 }
 
 // ============================================================================
+//  ScopedPartitionLock
+// ============================================================================
+
+// A coarse grained partition level lock to prevent concurrent transactions to
+// a given tablet. Each lock is associated with a single transaction at a time.
+class PartitionLockState {
+ public:
+  explicit PartitionLockState(const TxnId& txn_id)
+      : txn_id_(txn_id) {}
+  TxnId txn_id() const {
+    return txn_id_;
+  }
+ private:
+  // The transaction ID that holds the partition lock.
+  const TxnId txn_id_;
+};
+
+ScopedPartitionLock::ScopedPartitionLock(LockManager* manager,
+                                         const TxnId& txn_id,
+                                         LockManager::LockWaitMode wait_mode)
+    : manager_(DCHECK_NOTNULL(manager)),
+      code_(TabletServerErrorPB::UNKNOWN_ERROR) {
+  switch (wait_mode) {
+    case LockManager::TRY_LOCK:
+      lock_state_ = manager_->TryAcquirePartitionLock(txn_id, &code_, 
MonoDelta::FromSeconds(1));
+      break;
+    case LockManager::WAIT_FOR_LOCK:
+      lock_state_ = manager_->WaitUntilAcquiredPartitionLock(txn_id);
+      DCHECK(lock_state_);
+      break;
+    default:
+      LOG(DFATAL) << "not reachable";
+      break;
+  }
+}
+
+ScopedPartitionLock::~ScopedPartitionLock() {
+  if (manager_) {
+    Release();
+  }
+}
+
+bool ScopedPartitionLock::IsAcquired(tserver::TabletServerErrorPB::Code* code) 
const {
+  if (lock_state_) {
+    return true;
+  }
+  *code = code_;
+  return false;
+}
+
+void ScopedPartitionLock::Release() {
+  // Already released.
+  if (!lock_state_) {
+    return;
+  }
+  manager_->ReleasePartitionLock();
+  lock_state_ = nullptr;
+}
+
+ScopedPartitionLock::ScopedPartitionLock(ScopedPartitionLock&& other) noexcept 
{
+  TakeState(&other);
+}
+
+ScopedPartitionLock& ScopedPartitionLock::operator=(ScopedPartitionLock&& 
other) noexcept {
+  TakeState(&other);
+  return *this;
+}
+
+void ScopedPartitionLock::TakeState(ScopedPartitionLock* other) {
+  DCHECK(other != this);
+  manager_ = other->manager_;
+  lock_state_ = other->lock_state_;
+  code_ = other->code_;
+  other->lock_state_ = nullptr;
+}
+
+// ============================================================================
 //  LockManager
 // ============================================================================
 
 LockManager::LockManager()
-  : locks_(new LockTable()) {
+  : partition_sem_(1),
+    partition_lock_refs_(0),
+    locks_(new LockTable) {
 }
 
 LockManager::~LockManager() {
   delete locks_;
 }
 
+PartitionLockState* LockManager::TryAcquirePartitionLock(
+    const TxnId& txn_id,
+    TabletServerErrorPB::Code* code,
+    const MonoDelta& timeout) {
+  // Favor transactional ops over non-transactional ones by giving a
+  // non-transactional ops the maximum txn ID. We favor transactional ops here
+  // because aborting and retrying a transaction likely entails retrying
+  // several ops.
+  //
+  // TODO(hao): this may result in lock starvation for non-transactional ops.
+  // We should evaluate strategies to avoid this.
+  const auto requested_id = txn_id.IsValid() ?
+      txn_id.value() : std::numeric_limits<int64_t>::max();
+
+  // The most anticipated case is the lock is being re-acquired multiple times.
+  {
+    std::lock_guard<simple_spinlock> l(p_lock_);
+    if (partition_lock_ &&
+        PREDICT_TRUE(requested_id == partition_lock_->txn_id().value())) {
+      DCHECK_GT(partition_lock_refs_, 0);
+      DCHECK_GE(0, partition_sem_.GetValue());
+      partition_lock_refs_ += 1;
+      return partition_lock_.get();
+    }
+  }
+
+  // We expect low contention, so use TryAcquire first so we don't have to do a
+  // syscall to get the current time.
+  if (!partition_sem_.TryAcquire()) {
+    const MonoTime start(MonoTime::Now());
+    while (!partition_sem_.TimedAcquire(MonoDelta::FromMilliseconds(250))) {
+      bool has_timeout = timeout.Initialized();
+      MonoDelta elapsed;
+      if (has_timeout) {
+        elapsed = MonoTime::Now() - start;
+      }
+      if (has_timeout && elapsed > timeout) {
+        LOG(WARNING) << Substitute("Txn $0 has not acquired the partition lock 
after $1ms",
+                                  requested_id, elapsed.ToMilliseconds());
+        // If we're still unable to take 'partition_sem_', but 
'partition_lock_'
+        // is unset, just try again -- another thread is likely in the midsts 
of
+        // unsetting it and 'partition_sem_' should be available soon.
+        std::lock_guard<simple_spinlock> l(p_lock_);
+        if (!partition_lock_) {
+          continue;
+        }
+        // If the requestor requires a lock held by another transaction. Abort
+        // the requested transaction immediately if it has a higher txn ID than
+        // the transaction holding the lock. Otherwise, let the requestor 
retry.
+        //
+        // TODO(hao): generalize deadlock prevention scheme when adding new
+        // scheme or lock type.
+        *code = requested_id > partition_lock_->txn_id().value() ?
+            TabletServerErrorPB::TXN_LOCKED_ABORT : 
TabletServerErrorPB::TXN_LOCKED_RETRY_OP;
+        return nullptr;
+      }
+    }
+  }
+  std::lock_guard<simple_spinlock> l(p_lock_);
+  DCHECK_GE(0, partition_sem_.GetValue());
+  DCHECK(!partition_lock_);
+  DCHECK_EQ(partition_lock_refs_, 0);
+  // No one is holding the lock -- take it now.
+  partition_lock_.reset(new PartitionLockState(requested_id));
+  partition_lock_refs_ = 1;
+  return partition_lock_.get();
+}
+
+PartitionLockState* LockManager::WaitUntilAcquiredPartitionLock(const TxnId& 
txn_id) {
+  MicrosecondsInt64 start_wait_us = GetMonoTimeMicros();
+  TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+  PartitionLockState* lock = TryAcquirePartitionLock(txn_id, &code);
+  CHECK(lock);
+  MicrosecondsInt64 wait_us = GetMonoTimeMicros() - start_wait_us;
+  TRACE_COUNTER_INCREMENT("partition_lock_wait_us", wait_us);
+  if (wait_us > 100 * 1000) {
+    TRACE("Waited $0us to acquire the partition lock", wait_us);
+  }
+  return lock;
+}
+
 std::vector<LockEntry*> LockManager::LockBatch(ArrayView<Slice> keys, const 
OpState* op) {
   vector<LockEntry*> entries = locks_->GetLockEntries(keys);
 
@@ -378,6 +543,15 @@ std::vector<LockEntry*> 
LockManager::LockBatch(ArrayView<Slice> keys, const OpSt
 
 void LockManager::ReleaseBatch(ArrayView<LockEntry*> locks) { 
locks_->ReleaseLockEntries(locks); }
 
+void LockManager::ReleasePartitionLock() {
+  std::lock_guard<simple_spinlock> l(p_lock_);
+  DCHECK_GT(partition_lock_refs_, 0);
+  if (--partition_lock_refs_ == 0) {
+    partition_sem_.unlock();
+    partition_lock_.reset();
+  }
+}
+
 void LockManager::AcquireLockOnEntry(LockEntry* entry, const OpState* op) {
   // We expect low contention, so just try to try_lock first. This is faster
   // than a timed_lock, since we don't have to do a syscall to get the current
@@ -386,7 +560,6 @@ void LockManager::AcquireLockOnEntry(LockEntry* entry, 
const OpState* op) {
     // If the current holder of this lock is the same op just increment
     // the recursion count without acquiring the mutex.
     //
-    //
     // NOTE: This is not a problem for the current way locks are managed since
     // they are obtained and released in bulk (all locks for an op are
     // obtained and released at the same time). If at any time in the future
diff --git a/src/kudu/tablet/lock_manager.h b/src/kudu/tablet/lock_manager.h
index 3f761a4..e7e6a75 100644
--- a/src/kudu/tablet/lock_manager.h
+++ b/src/kudu/tablet/lock_manager.h
@@ -14,32 +14,43 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#ifndef KUDU_TABLET_LOCK_MANAGER_H
-#define KUDU_TABLET_LOCK_MANAGER_H
+#pragma once
 
+#include <cstdint>
+#include <memory>
 #include <vector>
 
+#include "kudu/common/txn_id.h"
 #include "kudu/gutil/macros.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/semaphore.h"
 #include "kudu/util/slice.h"
 
 namespace kudu {
+
 template <typename T>
 class ArrayView;
-}  // namespace kudu
 
-namespace kudu { namespace tablet {
+namespace tablet {
 
 class LockEntry;
 class LockTable;
 class OpState;
+class PartitionLockState;
 
-// Super-simple lock manager implementation. This only supports exclusive
-// locks, and makes no attempt to prevent deadlocks if a single thread
-// takes multiple locks.
+// Lock manager implementation that only supports exclusive locks. It is
+// composed of two types of lock: 'ScopedRowLock' for single-row lock and
+// 'ScopedPartitionLock' for partition lock.
 //
-// In the future when we want to support multi-row transactions of some kind
-// we'll have to implement a proper lock manager with all its trappings,
-// but this should be enough for the single-row use case.
+// For deadlock prevention of multi-row transactions, a wait-die scheme is
+// used. When a transaction B attempts to take a lock that's already held by
+// transaction A:
+// - B > A: transaction B should be aborted, and applications should retry
+//   B at later time, signified by the returning of TXN_LOCKED_ABORT.
+// - B < A: transaction B should wait for the lock to be taken by retrying the
+//   transactional op, signified by the returning of TXN_LOCKED_RETRY_OP.
 class LockManager {
  public:
   LockManager();
@@ -49,7 +60,21 @@ class LockManager {
     LOCK_EXCLUSIVE
   };
 
+  enum LockWaitMode {
+    // The attempt to take the lock must wait until the lock is taken. This is
+    // only appropriate if it is guaranteed that the lock can be waited on
+    // without a deadlock.
+    WAIT_FOR_LOCK,
+
+    // Try to acquire the lock with a time out, if not available return without
+    // acquiring it.
+    TRY_LOCK,
+  };
+
+ int64_t partition_lock_refs() const { return partition_lock_refs_; }
+
  private:
+  friend class ScopedPartitionLock;
   friend class ScopedRowLock;
   friend class LockManagerTest;
 
@@ -59,8 +84,48 @@ class LockManager {
   void Release(LockEntry* lock);
   void ReleaseBatch(ArrayView<LockEntry*> locks);
 
+  // Tries to acquire the partition lock with the given txn ID with the given
+  // timeout, or tries indefinitely if no timeout is set. A partition lock can
+  // only be held by a single transaction at a time; the same transaction can
+  // acquire the lock multiple times. Both transactional and non-transactional
+  // ops must try to acquire the lock (non-transactional ops are signified with
+  // an invalid 'txn_id').
+  //
+  // If the attempt to lock fails, an appropriate error code is returned based
+  // on the transaction ID and the deadlock prevention policy described above.
+  PartitionLockState* TryAcquirePartitionLock(const TxnId& txn_id,
+                                              
tserver::TabletServerErrorPB::Code* code,
+                                              const MonoDelta& timeout = 
MonoDelta());
+
+  // Similar to the above, but waits until the lock is acquired.
+  //
+  // Note that the caller is expected to ensure there is no deadlock. For
+  // example, when running on followers in the prepare phase, or running
+  // serially in an order that has already been successful with
+  // TryAcquirePartitionLock() calls.
+  PartitionLockState* WaitUntilAcquiredPartitionLock(const TxnId& txn_id);
+  void ReleasePartitionLock();
+
   static void AcquireLockOnEntry(LockEntry* e, const OpState* op);
 
+  // Semaphore used by the LockManager to signal the release of the partition
+  // lock. If its value is >= 0, the partition lock is already held, and
+  // callers can use TimedAcquire() to wait for it to be released.
+  Semaphore partition_sem_;
+
+  // Lock to protect 'partition_lock_' and 'partition_lock_refs_'.
+  simple_spinlock p_lock_;
+
+  // If 'partition_lock_' has been held by a transaction,
+  // 'partition_lock_refs_' keeps track of the number of times that the
+  // partition lock has been held by that transaction.
+  //
+  // NOTE: 'partition_lock_' is only set to non-null by a single thread (i.e.
+  // the prepare thread), but it may be released from a different thread (e.g.
+  // an apply thread).
+  int64_t partition_lock_refs_;
+  std::unique_ptr<PartitionLockState> partition_lock_;
+
   LockTable *locks_;
 
   DISALLOW_COPY_AND_ASSIGN(LockManager);
@@ -120,6 +185,56 @@ class ScopedRowLock {
   std::vector<LockEntry*> entries_;
 };
 
+// Similar to ScopedRowLock, hold a lock on a partition, for the scope of
+// this object. Usage:
+//  {
+//    ScopedPartitionLock(&manager, txn_id);
+//    .. do stuff ..
+//  }
+//  // lock is released when the object exits its scope.
+class ScopedPartitionLock {
+ public:
+  // Construct an initially-unlocked lock holder.
+  // You can later assign this to actually hold a lock using
+  // the move-constructor:
+  //   ScopedPartitionLock l;
+  //   l = ScopedPartitionLock(...);
+  // or
+  //   l = std::move(other_partition_lock);
+  ScopedPartitionLock()
+      : code_(tserver::TabletServerErrorPB::UNKNOWN_ERROR) {}
+
+  // 'wait_mode' indicates whether or not to wait until
+  // the lock is acquired.
+  ScopedPartitionLock(LockManager* manager,
+      const TxnId& txn_id,
+      LockManager::LockWaitMode wait_mode = LockManager::TRY_LOCK);
+  ~ScopedPartitionLock();
+
+  // Move constructor and assignment operator.
+  ScopedPartitionLock(ScopedPartitionLock&& other) noexcept;
+  ScopedPartitionLock& operator=(ScopedPartitionLock&& other) noexcept;
+
+  // Disable the copy constructor.
+  ScopedPartitionLock(const ScopedPartitionLock&) = delete;
+
+  // Check whether the partition lock is acquired by the transaction.
+  // If false, set the tablet server error code accordingly to abort
+  // or retry the transaction. Otherwise, no error code is set.
+  bool IsAcquired(tserver::TabletServerErrorPB::Code* code) const;
+
+  // Release a reference of the partition lock held by the transaction.
+  void Release();
+
+ private:
+  void TakeState(ScopedPartitionLock* other);
+
+  LockManager* manager_ = nullptr;
+  PartitionLockState* lock_state_ = nullptr;
+  // The tablet server error code is only set when the lock
+  // is not acquired. Otherwise, the default is 'UNKNOWN_ERROR'.
+  tserver::TabletServerErrorPB::Code code_;
+};
+
 } // namespace tablet
 } // namespace kudu
-#endif
diff --git a/src/kudu/tserver/tserver.proto b/src/kudu/tserver/tserver.proto
index 4bd6c6b..9721bf8 100644
--- a/src/kudu/tserver/tserver.proto
+++ b/src/kudu/tserver/tserver.proto
@@ -109,6 +109,13 @@ message TabletServerErrorPB {
 
     // The requested transaction participant op was already applied.
     TXN_OP_ALREADY_APPLIED = 23;
+
+    // The requested transaction needs to be aborted for deadlock prevention.
+    TXN_LOCKED_ABORT = 24;
+
+    // The requested transaction participant op or write op needs to be
+    // retried, because the required lock is held by another transaction.
+    TXN_LOCKED_RETRY_OP = 25;
   }
 
   // The error code.

Reply via email to