This is an automated email from the ASF dual-hosted git repository.
maplefu pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 94eb31cf fix: should remove the db lock from the context to prevent
blocking user requests (#2597)
94eb31cf is described below
commit 94eb31cf98b80c3612bbc232aa3c1bb1d735f183
Author: hulk <[email protected]>
AuthorDate: Tue Oct 15 00:48:13 2024 +0800
fix: should remove the db lock from the context to prevent blocking user
requests (#2597)
---
src/storage/batch_indexer.h | 3 ++-
src/storage/storage.cc | 27 ++++++++++++------------
src/storage/storage.h | 50 ++++++++++++++++++++++++---------------------
3 files changed, 42 insertions(+), 38 deletions(-)
diff --git a/src/storage/batch_indexer.h b/src/storage/batch_indexer.h
index ceba8bed..99fc7f87 100644
--- a/src/storage/batch_indexer.h
+++ b/src/storage/batch_indexer.h
@@ -39,7 +39,8 @@ class WriteBatchIndexer : public rocksdb::WriteBatch::Handler
{
DCHECK_NOTNULL(dest_batch);
DCHECK_NOTNULL(snapshot);
}
- explicit WriteBatchIndexer(engine::Context& ctx) :
WriteBatchIndexer(ctx.storage, ctx.batch.get(), ctx.snapshot) {}
+ explicit WriteBatchIndexer(engine::Context& ctx)
+ : WriteBatchIndexer(ctx.storage, ctx.batch.get(), ctx.GetSnapshot()) {}
rocksdb::Status PutCF(uint32_t column_family_id, const rocksdb::Slice& key,
const rocksdb::Slice& value) override {
return
dest_batch_->Put(storage_->GetCFHandle(static_cast<ColumnFamilyID>(column_family_id)),
key, value);
}
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index f4627eae..116b4527 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -599,7 +599,7 @@ rocksdb::Status Storage::Get(engine::Context &ctx, const
rocksdb::ReadOptions &o
std::string *value) {
if (ctx.is_txn_mode) {
DCHECK_NOTNULL(options.snapshot);
- DCHECK_EQ(ctx.snapshot->GetSequenceNumber(),
options.snapshot->GetSequenceNumber());
+ DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(),
options.snapshot->GetSequenceNumber());
}
rocksdb::Status s;
if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
@@ -624,7 +624,7 @@ rocksdb::Status Storage::Get(engine::Context &ctx, const
rocksdb::ReadOptions &o
rocksdb::PinnableSlice *value) {
if (ctx.is_txn_mode) {
DCHECK_NOTNULL(options.snapshot);
- DCHECK_EQ(ctx.snapshot->GetSequenceNumber(),
options.snapshot->GetSequenceNumber());
+ DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(),
options.snapshot->GetSequenceNumber());
}
rocksdb::Status s;
if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
@@ -657,7 +657,7 @@ rocksdb::Iterator *Storage::NewIterator(engine::Context
&ctx, const rocksdb::Rea
rocksdb::ColumnFamilyHandle
*column_family) {
if (ctx.is_txn_mode) {
DCHECK_NOTNULL(options.snapshot);
- DCHECK_EQ(ctx.snapshot->GetSequenceNumber(),
options.snapshot->GetSequenceNumber());
+ DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(),
options.snapshot->GetSequenceNumber());
}
auto iter = db_->NewIterator(options, column_family);
if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
@@ -673,7 +673,7 @@ void Storage::MultiGet(engine::Context &ctx, const
rocksdb::ReadOptions &options
rocksdb::PinnableSlice *values, rocksdb::Status
*statuses) {
if (ctx.is_txn_mode) {
DCHECK_NOTNULL(options.snapshot);
- DCHECK_EQ(ctx.snapshot->GetSequenceNumber(),
options.snapshot->GetSequenceNumber());
+ DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(),
options.snapshot->GetSequenceNumber());
}
if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
txn_write_batch_->MultiGetFromBatchAndDB(db_.get(), options,
column_family, num_keys, keys, values, statuses,
@@ -1274,30 +1274,29 @@ bool Storage::ReplDataManager::FileExists(Storage
*storage, const std::string &d
return crc == tmp_crc;
}
-[[nodiscard]] rocksdb::ReadOptions Context::GetReadOptions() const {
+[[nodiscard]] rocksdb::ReadOptions Context::GetReadOptions() {
rocksdb::ReadOptions read_options;
- if (is_txn_mode) read_options.snapshot = snapshot;
+ if (is_txn_mode) read_options.snapshot = GetSnapshot();
return read_options;
}
-[[nodiscard]] rocksdb::ReadOptions Context::DefaultScanOptions() const {
+[[nodiscard]] rocksdb::ReadOptions Context::DefaultScanOptions() {
rocksdb::ReadOptions read_options = storage->DefaultScanOptions();
- if (is_txn_mode) read_options.snapshot = snapshot;
+ if (is_txn_mode) read_options.snapshot = GetSnapshot();
return read_options;
}
-[[nodiscard]] rocksdb::ReadOptions Context::DefaultMultiGetOptions() const {
+[[nodiscard]] rocksdb::ReadOptions Context::DefaultMultiGetOptions() {
rocksdb::ReadOptions read_options = storage->DefaultMultiGetOptions();
- if (is_txn_mode) read_options.snapshot = snapshot;
+ if (is_txn_mode) read_options.snapshot = GetSnapshot();
return read_options;
}
void Context::RefreshLatestSnapshot() {
- auto guard = storage->WriteLockGuard();
- if (snapshot) {
- storage->GetDB()->ReleaseSnapshot(snapshot);
+ if (snapshot_) {
+ storage->GetDB()->ReleaseSnapshot(snapshot_);
}
- snapshot = storage->GetDB()->GetSnapshot();
+ snapshot_ = storage->GetDB()->GetSnapshot();
if (batch) {
batch->Clear();
}
diff --git a/src/storage/storage.h b/src/storage/storage.h
index 5c3baa0b..3ce9b78a 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -381,11 +381,7 @@ class Storage {
/// Context does not provide thread safety guarantees and is generally only
passed as a parameter between APIs.
struct Context {
engine::Storage *storage = nullptr;
- /// If is_txn_mode is true, snapshot should be specified instead of nullptr
when used,
- /// and should be consistent with snapshot in ReadOptions to avoid ambiguity.
- /// Normally it will be fixed to the latest Snapshot when the Context is
constructed.
- /// If is_txn_mode is false, the snapshot is nullptr.
- const rocksdb::Snapshot *snapshot = nullptr;
+
std::unique_ptr<rocksdb::WriteBatchWithIndex> batch = nullptr;
/// is_txn_mode is used to determine whether the current Context is in
transactional mode,
@@ -398,30 +394,23 @@ struct Context {
/// GetReadOptions returns a default ReadOptions, and if is_txn_mode = true,
then its snapshot is specified by the
/// Context
- [[nodiscard]] rocksdb::ReadOptions GetReadOptions() const;
+ [[nodiscard]] rocksdb::ReadOptions GetReadOptions();
/// DefaultScanOptions returns a DefaultScanOptions, and if is_txn_mode =
true, then its snapshot is specified by the
/// Context. Otherwise it is the same as Storage::DefaultScanOptions
- [[nodiscard]] rocksdb::ReadOptions DefaultScanOptions() const;
+ [[nodiscard]] rocksdb::ReadOptions DefaultScanOptions();
/// DefaultMultiGetOptions returns a DefaultMultiGetOptions, and if
is_txn_mode = true, then its snapshot is specified
/// by the Context. Otherwise it is the same as
Storage::DefaultMultiGetOptions
- [[nodiscard]] rocksdb::ReadOptions DefaultMultiGetOptions() const;
+ [[nodiscard]] rocksdb::ReadOptions DefaultMultiGetOptions();
void RefreshLatestSnapshot();
/// TODO: Change it to defer getting the context, and the snapshot is pinned
after the first read operation
- explicit Context(engine::Storage *storage) : storage(storage) {
- auto guard = storage->ReadLockGuard();
- if (!storage->GetConfig()->txn_context_enabled) {
- is_txn_mode = false;
- return;
- }
- snapshot = storage->GetDB()->GetSnapshot(); // NOLINT
- }
+ explicit Context(engine::Storage *storage)
+ : storage(storage),
is_txn_mode(storage->GetConfig()->txn_context_enabled) {}
~Context() {
if (storage) {
- auto guard = storage->WriteLockGuard();
- if (storage->GetDB() && snapshot) {
- storage->GetDB()->ReleaseSnapshot(snapshot);
+ if (snapshot_ && storage->GetDB()) {
+ storage->GetDB()->ReleaseSnapshot(snapshot_);
}
}
}
@@ -430,22 +419,37 @@ struct Context {
Context &operator=(Context &&ctx) noexcept {
if (this != &ctx) {
storage = ctx.storage;
- snapshot = ctx.snapshot;
+ snapshot_ = ctx.snapshot_;
batch = std::move(ctx.batch);
ctx.storage = nullptr;
- ctx.snapshot = nullptr;
+ ctx.snapshot_ = nullptr;
}
return *this;
}
- Context(Context &&ctx) noexcept : storage(ctx.storage),
snapshot(ctx.snapshot), batch(std::move(ctx.batch)) {
+ Context(Context &&ctx) noexcept : storage(ctx.storage),
batch(std::move(ctx.batch)), snapshot_(ctx.snapshot_) {
ctx.storage = nullptr;
- ctx.snapshot = nullptr;
+ ctx.snapshot_ = nullptr;
+ }
+
+ // GetSnapshot will create a snapshot first if it doesn't exist,
+ // and it's not a thread-safe operation.
+ const rocksdb::Snapshot *GetSnapshot() {
+ if (snapshot_ == nullptr) {
+ snapshot_ = storage->GetDB()->GetSnapshot(); // NOLINT
+ }
+ return snapshot_;
}
private:
/// It is only used by NonTransactionContext
explicit Context(engine::Storage *storage, bool txn_mode) :
storage(storage), is_txn_mode(txn_mode) {}
+
+ /// If is_txn_mode is true, snapshot should be specified instead of nullptr
when used,
+ /// and should be consistent with snapshot in ReadOptions to avoid ambiguity.
+ /// Normally it will be fixed to the latest Snapshot when the Context is
constructed.
+ /// If is_txn_mode is false, the snapshot is nullptr.
+ const rocksdb::Snapshot *snapshot_ = nullptr;
};
} // namespace engine