This is an automated email from the ASF dual-hosted git repository. hulk pushed a commit to branch 2.10 in repository https://gitbox.apache.org/repos/asf/kvrocks.git
commit faa593cba5a9ae1e8a714391d076046cb77c8557 Author: hulk <[email protected]> AuthorDate: Tue Oct 15 00:48:13 2024 +0800 fix: should remove the db lock from the context to prevent blocking user requests (#2597) --- src/storage/batch_indexer.h | 3 ++- src/storage/storage.cc | 27 ++++++++++++------------ src/storage/storage.h | 50 ++++++++++++++++++++++++--------------------- 3 files changed, 42 insertions(+), 38 deletions(-) diff --git a/src/storage/batch_indexer.h b/src/storage/batch_indexer.h index ceba8bed..99fc7f87 100644 --- a/src/storage/batch_indexer.h +++ b/src/storage/batch_indexer.h @@ -39,7 +39,8 @@ class WriteBatchIndexer : public rocksdb::WriteBatch::Handler { DCHECK_NOTNULL(dest_batch); DCHECK_NOTNULL(snapshot); } - explicit WriteBatchIndexer(engine::Context& ctx) : WriteBatchIndexer(ctx.storage, ctx.batch.get(), ctx.snapshot) {} + explicit WriteBatchIndexer(engine::Context& ctx) + : WriteBatchIndexer(ctx.storage, ctx.batch.get(), ctx.GetSnapshot()) {} rocksdb::Status PutCF(uint32_t column_family_id, const rocksdb::Slice& key, const rocksdb::Slice& value) override { return dest_batch_->Put(storage_->GetCFHandle(static_cast<ColumnFamilyID>(column_family_id)), key, value); } diff --git a/src/storage/storage.cc b/src/storage/storage.cc index f4627eae..116b4527 100644 --- a/src/storage/storage.cc +++ b/src/storage/storage.cc @@ -599,7 +599,7 @@ rocksdb::Status Storage::Get(engine::Context &ctx, const rocksdb::ReadOptions &o std::string *value) { if (ctx.is_txn_mode) { DCHECK_NOTNULL(options.snapshot); - DCHECK_EQ(ctx.snapshot->GetSequenceNumber(), options.snapshot->GetSequenceNumber()); + DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(), options.snapshot->GetSequenceNumber()); } rocksdb::Status s; if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) { @@ -624,7 +624,7 @@ rocksdb::Status Storage::Get(engine::Context &ctx, const rocksdb::ReadOptions &o rocksdb::PinnableSlice *value) { if (ctx.is_txn_mode) { DCHECK_NOTNULL(options.snapshot); - DCHECK_EQ(ctx.snapshot->GetSequenceNumber(), options.snapshot->GetSequenceNumber()); + DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(), options.snapshot->GetSequenceNumber()); } rocksdb::Status s; if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) { @@ -657,7 +657,7 @@ rocksdb::Iterator *Storage::NewIterator(engine::Context &ctx, const rocksdb::Rea rocksdb::ColumnFamilyHandle *column_family) { if (ctx.is_txn_mode) { DCHECK_NOTNULL(options.snapshot); - DCHECK_EQ(ctx.snapshot->GetSequenceNumber(), options.snapshot->GetSequenceNumber()); + DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(), options.snapshot->GetSequenceNumber()); } auto iter = db_->NewIterator(options, column_family); if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) { @@ -673,7 +673,7 @@ void Storage::MultiGet(engine::Context &ctx, const rocksdb::ReadOptions &options rocksdb::PinnableSlice *values, rocksdb::Status *statuses) { if (ctx.is_txn_mode) { DCHECK_NOTNULL(options.snapshot); - DCHECK_EQ(ctx.snapshot->GetSequenceNumber(), options.snapshot->GetSequenceNumber()); + DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(), options.snapshot->GetSequenceNumber()); } if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) { txn_write_batch_->MultiGetFromBatchAndDB(db_.get(), options, column_family, num_keys, keys, values, statuses, @@ -1274,30 +1274,29 @@ bool Storage::ReplDataManager::FileExists(Storage *storage, const std::string &d return crc == tmp_crc; } -[[nodiscard]] rocksdb::ReadOptions Context::GetReadOptions() const { +[[nodiscard]] rocksdb::ReadOptions Context::GetReadOptions() { rocksdb::ReadOptions read_options; - if (is_txn_mode) read_options.snapshot = snapshot; + if (is_txn_mode) read_options.snapshot = GetSnapshot(); return read_options; } -[[nodiscard]] rocksdb::ReadOptions Context::DefaultScanOptions() const { +[[nodiscard]] rocksdb::ReadOptions Context::DefaultScanOptions() { rocksdb::ReadOptions read_options = storage->DefaultScanOptions(); - if (is_txn_mode) read_options.snapshot = snapshot; + if (is_txn_mode) read_options.snapshot = GetSnapshot(); return read_options; } -[[nodiscard]] rocksdb::ReadOptions Context::DefaultMultiGetOptions() const { +[[nodiscard]] rocksdb::ReadOptions Context::DefaultMultiGetOptions() { rocksdb::ReadOptions read_options = storage->DefaultMultiGetOptions(); - if (is_txn_mode) read_options.snapshot = snapshot; + if (is_txn_mode) read_options.snapshot = GetSnapshot(); return read_options; } void Context::RefreshLatestSnapshot() { - auto guard = storage->WriteLockGuard(); - if (snapshot) { - storage->GetDB()->ReleaseSnapshot(snapshot); + if (snapshot_) { + storage->GetDB()->ReleaseSnapshot(snapshot_); } - snapshot = storage->GetDB()->GetSnapshot(); + snapshot_ = storage->GetDB()->GetSnapshot(); if (batch) { batch->Clear(); } diff --git a/src/storage/storage.h b/src/storage/storage.h index 5c3baa0b..3ce9b78a 100644 --- a/src/storage/storage.h +++ b/src/storage/storage.h @@ -381,11 +381,7 @@ class Storage { /// Context does not provide thread safety guarantees and is generally only passed as a parameter between APIs. struct Context { engine::Storage *storage = nullptr; - /// If is_txn_mode is true, snapshot should be specified instead of nullptr when used, - /// and should be consistent with snapshot in ReadOptions to avoid ambiguity. - /// Normally it will be fixed to the latest Snapshot when the Context is constructed. - /// If is_txn_mode is false, the snapshot is nullptr. - const rocksdb::Snapshot *snapshot = nullptr; + std::unique_ptr<rocksdb::WriteBatchWithIndex> batch = nullptr; /// is_txn_mode is used to determine whether the current Context is in transactional mode, @@ -398,30 +394,23 @@ struct Context { /// GetReadOptions returns a default ReadOptions, and if is_txn_mode = true, then its snapshot is specified by the /// Context - [[nodiscard]] rocksdb::ReadOptions GetReadOptions() const; + [[nodiscard]] rocksdb::ReadOptions GetReadOptions(); /// DefaultScanOptions returns a DefaultScanOptions, and if is_txn_mode = true, then its snapshot is specified by the /// Context. Otherwise it is the same as Storage::DefaultScanOptions - [[nodiscard]] rocksdb::ReadOptions DefaultScanOptions() const; + [[nodiscard]] rocksdb::ReadOptions DefaultScanOptions(); /// DefaultMultiGetOptions returns a DefaultMultiGetOptions, and if is_txn_mode = true, then its snapshot is specified /// by the Context. Otherwise it is the same as Storage::DefaultMultiGetOptions - [[nodiscard]] rocksdb::ReadOptions DefaultMultiGetOptions() const; + [[nodiscard]] rocksdb::ReadOptions DefaultMultiGetOptions(); void RefreshLatestSnapshot(); /// TODO: Change it to defer getting the context, and the snapshot is pinned after the first read operation - explicit Context(engine::Storage *storage) : storage(storage) { - auto guard = storage->ReadLockGuard(); - if (!storage->GetConfig()->txn_context_enabled) { - is_txn_mode = false; - return; - } - snapshot = storage->GetDB()->GetSnapshot(); // NOLINT - } + explicit Context(engine::Storage *storage) + : storage(storage), is_txn_mode(storage->GetConfig()->txn_context_enabled) {} ~Context() { if (storage) { - auto guard = storage->WriteLockGuard(); - if (storage->GetDB() && snapshot) { - storage->GetDB()->ReleaseSnapshot(snapshot); + if (snapshot_ && storage->GetDB()) { + storage->GetDB()->ReleaseSnapshot(snapshot_); } } } @@ -430,22 +419,37 @@ struct Context { Context &operator=(Context &&ctx) noexcept { if (this != &ctx) { storage = ctx.storage; - snapshot = ctx.snapshot; + snapshot_ = ctx.snapshot_; batch = std::move(ctx.batch); ctx.storage = nullptr; - ctx.snapshot = nullptr; + ctx.snapshot_ = nullptr; } return *this; } - Context(Context &&ctx) noexcept : storage(ctx.storage), snapshot(ctx.snapshot), batch(std::move(ctx.batch)) { + Context(Context &&ctx) noexcept : storage(ctx.storage), batch(std::move(ctx.batch)), snapshot_(ctx.snapshot_) { ctx.storage = nullptr; - ctx.snapshot = nullptr; + ctx.snapshot_ = nullptr; + } + + // GetSnapshot will create a snapshot first if it doesn't exist, + // and it's not a thread-safe operation. + const rocksdb::Snapshot *GetSnapshot() { + if (snapshot_ == nullptr) { + snapshot_ = storage->GetDB()->GetSnapshot(); // NOLINT + } + return snapshot_; } private: /// It is only used by NonTransactionContext explicit Context(engine::Storage *storage, bool txn_mode) : storage(storage), is_txn_mode(txn_mode) {} + + /// If is_txn_mode is true, snapshot should be specified instead of nullptr when used, + /// and should be consistent with snapshot in ReadOptions to avoid ambiguity. + /// Normally it will be fixed to the latest Snapshot when the Context is constructed. + /// If is_txn_mode is false, the snapshot is nullptr. + const rocksdb::Snapshot *snapshot_ = nullptr; }; } // namespace engine
