This is an automated email from the ASF dual-hosted git repository.

maplefu pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 94eb31cf fix: should remove the db lock from the context to prevent 
blocking user requests (#2597)
94eb31cf is described below

commit 94eb31cf98b80c3612bbc232aa3c1bb1d735f183
Author: hulk <[email protected]>
AuthorDate: Tue Oct 15 00:48:13 2024 +0800

    fix: should remove the db lock from the context to prevent blocking user 
requests (#2597)
---
 src/storage/batch_indexer.h |  3 ++-
 src/storage/storage.cc      | 27 ++++++++++++------------
 src/storage/storage.h       | 50 ++++++++++++++++++++++++---------------------
 3 files changed, 42 insertions(+), 38 deletions(-)

diff --git a/src/storage/batch_indexer.h b/src/storage/batch_indexer.h
index ceba8bed..99fc7f87 100644
--- a/src/storage/batch_indexer.h
+++ b/src/storage/batch_indexer.h
@@ -39,7 +39,8 @@ class WriteBatchIndexer : public rocksdb::WriteBatch::Handler 
{
     DCHECK_NOTNULL(dest_batch);
     DCHECK_NOTNULL(snapshot);
   }
-  explicit WriteBatchIndexer(engine::Context& ctx) : 
WriteBatchIndexer(ctx.storage, ctx.batch.get(), ctx.snapshot) {}
+  explicit WriteBatchIndexer(engine::Context& ctx)
+      : WriteBatchIndexer(ctx.storage, ctx.batch.get(), ctx.GetSnapshot()) {}
   rocksdb::Status PutCF(uint32_t column_family_id, const rocksdb::Slice& key, 
const rocksdb::Slice& value) override {
     return 
dest_batch_->Put(storage_->GetCFHandle(static_cast<ColumnFamilyID>(column_family_id)),
 key, value);
   }
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index f4627eae..116b4527 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -599,7 +599,7 @@ rocksdb::Status Storage::Get(engine::Context &ctx, const 
rocksdb::ReadOptions &o
                              std::string *value) {
   if (ctx.is_txn_mode) {
     DCHECK_NOTNULL(options.snapshot);
-    DCHECK_EQ(ctx.snapshot->GetSequenceNumber(), 
options.snapshot->GetSequenceNumber());
+    DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(), 
options.snapshot->GetSequenceNumber());
   }
   rocksdb::Status s;
   if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
@@ -624,7 +624,7 @@ rocksdb::Status Storage::Get(engine::Context &ctx, const 
rocksdb::ReadOptions &o
                              rocksdb::PinnableSlice *value) {
   if (ctx.is_txn_mode) {
     DCHECK_NOTNULL(options.snapshot);
-    DCHECK_EQ(ctx.snapshot->GetSequenceNumber(), 
options.snapshot->GetSequenceNumber());
+    DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(), 
options.snapshot->GetSequenceNumber());
   }
   rocksdb::Status s;
   if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
@@ -657,7 +657,7 @@ rocksdb::Iterator *Storage::NewIterator(engine::Context 
&ctx, const rocksdb::Rea
                                         rocksdb::ColumnFamilyHandle 
*column_family) {
   if (ctx.is_txn_mode) {
     DCHECK_NOTNULL(options.snapshot);
-    DCHECK_EQ(ctx.snapshot->GetSequenceNumber(), 
options.snapshot->GetSequenceNumber());
+    DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(), 
options.snapshot->GetSequenceNumber());
   }
   auto iter = db_->NewIterator(options, column_family);
   if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
@@ -673,7 +673,7 @@ void Storage::MultiGet(engine::Context &ctx, const 
rocksdb::ReadOptions &options
                        rocksdb::PinnableSlice *values, rocksdb::Status 
*statuses) {
   if (ctx.is_txn_mode) {
     DCHECK_NOTNULL(options.snapshot);
-    DCHECK_EQ(ctx.snapshot->GetSequenceNumber(), 
options.snapshot->GetSequenceNumber());
+    DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(), 
options.snapshot->GetSequenceNumber());
   }
   if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
     txn_write_batch_->MultiGetFromBatchAndDB(db_.get(), options, 
column_family, num_keys, keys, values, statuses,
@@ -1274,30 +1274,29 @@ bool Storage::ReplDataManager::FileExists(Storage 
*storage, const std::string &d
   return crc == tmp_crc;
 }
 
-[[nodiscard]] rocksdb::ReadOptions Context::GetReadOptions() const {
+[[nodiscard]] rocksdb::ReadOptions Context::GetReadOptions() {
   rocksdb::ReadOptions read_options;
-  if (is_txn_mode) read_options.snapshot = snapshot;
+  if (is_txn_mode) read_options.snapshot = GetSnapshot();
   return read_options;
 }
 
-[[nodiscard]] rocksdb::ReadOptions Context::DefaultScanOptions() const {
+[[nodiscard]] rocksdb::ReadOptions Context::DefaultScanOptions() {
   rocksdb::ReadOptions read_options = storage->DefaultScanOptions();
-  if (is_txn_mode) read_options.snapshot = snapshot;
+  if (is_txn_mode) read_options.snapshot = GetSnapshot();
   return read_options;
 }
 
-[[nodiscard]] rocksdb::ReadOptions Context::DefaultMultiGetOptions() const {
+[[nodiscard]] rocksdb::ReadOptions Context::DefaultMultiGetOptions() {
   rocksdb::ReadOptions read_options = storage->DefaultMultiGetOptions();
-  if (is_txn_mode) read_options.snapshot = snapshot;
+  if (is_txn_mode) read_options.snapshot = GetSnapshot();
   return read_options;
 }
 
 void Context::RefreshLatestSnapshot() {
-  auto guard = storage->WriteLockGuard();
-  if (snapshot) {
-    storage->GetDB()->ReleaseSnapshot(snapshot);
+  if (snapshot_) {
+    storage->GetDB()->ReleaseSnapshot(snapshot_);
   }
-  snapshot = storage->GetDB()->GetSnapshot();
+  snapshot_ = storage->GetDB()->GetSnapshot();
   if (batch) {
     batch->Clear();
   }
diff --git a/src/storage/storage.h b/src/storage/storage.h
index 5c3baa0b..3ce9b78a 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -381,11 +381,7 @@ class Storage {
 /// Context does not provide thread safety guarantees and is generally only 
passed as a parameter between APIs.
 struct Context {
   engine::Storage *storage = nullptr;
-  /// If is_txn_mode is true, snapshot should be specified instead of nullptr 
when used,
-  /// and should be consistent with snapshot in ReadOptions to avoid ambiguity.
-  /// Normally it will be fixed to the latest Snapshot when the Context is 
constructed.
-  /// If is_txn_mode is false, the snapshot is nullptr.
-  const rocksdb::Snapshot *snapshot = nullptr;
+
   std::unique_ptr<rocksdb::WriteBatchWithIndex> batch = nullptr;
 
   /// is_txn_mode is used to determine whether the current Context is in 
transactional mode,
@@ -398,30 +394,23 @@ struct Context {
 
   /// GetReadOptions returns a default ReadOptions, and if is_txn_mode = true, 
then its snapshot is specified by the
   /// Context
-  [[nodiscard]] rocksdb::ReadOptions GetReadOptions() const;
+  [[nodiscard]] rocksdb::ReadOptions GetReadOptions();
   /// DefaultScanOptions returns a DefaultScanOptions, and if is_txn_mode = 
true, then its snapshot is specified by the
   /// Context. Otherwise it is the same as Storage::DefaultScanOptions
-  [[nodiscard]] rocksdb::ReadOptions DefaultScanOptions() const;
+  [[nodiscard]] rocksdb::ReadOptions DefaultScanOptions();
   /// DefaultMultiGetOptions returns a DefaultMultiGetOptions, and if 
is_txn_mode = true, then its snapshot is specified
   /// by the Context. Otherwise it is the same as 
Storage::DefaultMultiGetOptions
-  [[nodiscard]] rocksdb::ReadOptions DefaultMultiGetOptions() const;
+  [[nodiscard]] rocksdb::ReadOptions DefaultMultiGetOptions();
 
   void RefreshLatestSnapshot();
 
   /// TODO: Change it to defer getting the context, and the snapshot is pinned 
after the first read operation
-  explicit Context(engine::Storage *storage) : storage(storage) {
-    auto guard = storage->ReadLockGuard();
-    if (!storage->GetConfig()->txn_context_enabled) {
-      is_txn_mode = false;
-      return;
-    }
-    snapshot = storage->GetDB()->GetSnapshot();  // NOLINT
-  }
+  explicit Context(engine::Storage *storage)
+      : storage(storage), 
is_txn_mode(storage->GetConfig()->txn_context_enabled) {}
   ~Context() {
     if (storage) {
-      auto guard = storage->WriteLockGuard();
-      if (storage->GetDB() && snapshot) {
-        storage->GetDB()->ReleaseSnapshot(snapshot);
+      if (snapshot_ && storage->GetDB()) {
+        storage->GetDB()->ReleaseSnapshot(snapshot_);
       }
     }
   }
@@ -430,22 +419,37 @@ struct Context {
   Context &operator=(Context &&ctx) noexcept {
     if (this != &ctx) {
       storage = ctx.storage;
-      snapshot = ctx.snapshot;
+      snapshot_ = ctx.snapshot_;
       batch = std::move(ctx.batch);
 
       ctx.storage = nullptr;
-      ctx.snapshot = nullptr;
+      ctx.snapshot_ = nullptr;
     }
     return *this;
   }
-  Context(Context &&ctx) noexcept : storage(ctx.storage), 
snapshot(ctx.snapshot), batch(std::move(ctx.batch)) {
+  Context(Context &&ctx) noexcept : storage(ctx.storage), 
batch(std::move(ctx.batch)), snapshot_(ctx.snapshot_) {
     ctx.storage = nullptr;
-    ctx.snapshot = nullptr;
+    ctx.snapshot_ = nullptr;
+  }
+
+  // GetSnapshot will create a snapshot first if it doesn't exist,
+  // and it's not a thread-safe operation.
+  const rocksdb::Snapshot *GetSnapshot() {
+    if (snapshot_ == nullptr) {
+      snapshot_ = storage->GetDB()->GetSnapshot();  // NOLINT
+    }
+    return snapshot_;
   }
 
  private:
   /// It is only used by NonTransactionContext
   explicit Context(engine::Storage *storage, bool txn_mode) : 
storage(storage), is_txn_mode(txn_mode) {}
+
+  /// If is_txn_mode is true, snapshot should be specified instead of nullptr 
when used,
+  /// and should be consistent with snapshot in ReadOptions to avoid ambiguity.
+  /// Normally it will be fixed to the latest Snapshot when the Context is 
constructed.
+  /// If is_txn_mode is false, the snapshot is nullptr.
+  const rocksdb::Snapshot *snapshot_ = nullptr;
 };
 
 }  // namespace engine

Reply via email to