This is an automated email from the ASF dual-hosted git repository.
maplefu pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 1974d986 chore(enhancement): rename is_txn_mode to txn_context_enabled
(#2644)
1974d986 is described below
commit 1974d9868f1230763fe760a590105c6500fca4cc
Author: mwish <[email protected]>
AuthorDate: Thu Nov 14 12:33:57 2024 +0800
chore(enhancement): rename is_txn_mode to txn_context_enabled (#2644)
---
src/cluster/slot_migrate.h | 2 +-
src/commands/cmd_txn.cc | 4 ++++
src/config/config.h | 2 +-
src/server/redis_connection.cc | 2 +-
src/storage/batch_indexer.h | 4 +++-
src/storage/storage.cc | 29 ++++++++++++++-----------
src/storage/storage.h | 49 ++++++++++++++++++++++++++++--------------
7 files changed, 59 insertions(+), 33 deletions(-)
diff --git a/src/cluster/slot_migrate.h b/src/cluster/slot_migrate.h
index 1114f2a1..6d323b6b 100644
--- a/src/cluster/slot_migrate.h
+++ b/src/cluster/slot_migrate.h
@@ -45,7 +45,7 @@
enum class MigrationType {
/// Use Redis commands to migrate data.
- /// It will trying to extract commands from existing data and log, then
replay
+ /// It will try to extract commands from existing data and log, then replay
/// them on the destination node.
kRedisCommand = 0,
/// Using raw key-value and "APPLYBATCH" command in kvrocks to migrate data.
diff --git a/src/commands/cmd_txn.cc b/src/commands/cmd_txn.cc
index 4f3cb3a4..3d88d9a2 100644
--- a/src/commands/cmd_txn.cc
+++ b/src/commands/cmd_txn.cc
@@ -85,6 +85,10 @@ class CommandExec : public Commander {
auto s = storage->BeginTxn();
if (s.IsOK()) {
conn->ExecuteCommands(conn->GetMultiExecCommands());
+ // In Redis, errors happening after EXEC instead are not handled in a
special way:
+ // all the other commands will be executed even if some command fails
during
+ // the transaction.
+ // So, if conn->IsMultiError(), the transaction should still be
committed.
s = storage->CommitTxn();
}
return s;
diff --git a/src/config/config.h b/src/config/config.h
index 61ac0cf8..3b9d99de 100644
--- a/src/config/config.h
+++ b/src/config/config.h
@@ -54,7 +54,7 @@ constexpr const size_t GiB = 1024L * MiB;
constexpr const uint32_t kDefaultPort = 6666;
constexpr const char *kDefaultNamespace = "__namespace";
-constexpr const size_t KVROCKS_MAX_LSM_LEVEL = 7;
+constexpr int KVROCKS_MAX_LSM_LEVEL = 7;
enum class BlockCacheType { kCacheTypeLRU = 0, kCacheTypeHCC };
diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc
index 54899afe..e6bef06d 100644
--- a/src/server/redis_connection.cc
+++ b/src/server/redis_connection.cc
@@ -464,7 +464,7 @@ void Connection::ExecuteCommands(std::deque<CommandTokens>
*to_process_cmds) {
// We don't execute commands, but queue them, and then execute in EXEC
command
if (is_multi_exec && !in_exec_ && !(cmd_flags & kCmdEndMulti)) {
- multi_cmds_.emplace_back(cmd_tokens);
+ multi_cmds_.emplace_back(std::move(cmd_tokens));
Reply(redis::SimpleString("QUEUED"));
continue;
}
diff --git a/src/storage/batch_indexer.h b/src/storage/batch_indexer.h
index 99fc7f87..7d0fe170 100644
--- a/src/storage/batch_indexer.h
+++ b/src/storage/batch_indexer.h
@@ -29,7 +29,8 @@
#include "storage.h"
-// WriteBatchIndexer traverses the operations in WriteBatch and appends to the
specified WriteBatchWithIndex
+/// WriteBatchIndexer traverses the operations in WriteBatch and appends to the
+/// specified WriteBatchWithIndex.
class WriteBatchIndexer : public rocksdb::WriteBatch::Handler {
public:
explicit WriteBatchIndexer(engine::Storage* storage,
rocksdb::WriteBatchWithIndex* dest_batch,
@@ -41,6 +42,7 @@ class WriteBatchIndexer : public rocksdb::WriteBatch::Handler
{
}
explicit WriteBatchIndexer(engine::Context& ctx)
: WriteBatchIndexer(ctx.storage, ctx.batch.get(), ctx.GetSnapshot()) {}
+
rocksdb::Status PutCF(uint32_t column_family_id, const rocksdb::Slice& key,
const rocksdb::Slice& value) override {
return
dest_batch_->Put(storage_->GetCFHandle(static_cast<ColumnFamilyID>(column_family_id)),
key, value);
}
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index a446e3f0..36b027a6 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -597,14 +597,14 @@ rocksdb::Status Storage::Get(engine::Context &ctx, const
rocksdb::ReadOptions &o
rocksdb::Status Storage::Get(engine::Context &ctx, const rocksdb::ReadOptions
&options,
rocksdb::ColumnFamilyHandle *column_family, const
rocksdb::Slice &key,
std::string *value) {
- if (ctx.is_txn_mode) {
+ if (ctx.txn_context_enabled) {
DCHECK_NOTNULL(options.snapshot);
DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(),
options.snapshot->GetSequenceNumber());
}
rocksdb::Status s;
if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
s = txn_write_batch_->GetFromBatchAndDB(db_.get(), options, column_family,
key, value);
- } else if (ctx.batch && ctx.is_txn_mode) {
+ } else if (ctx.batch && ctx.txn_context_enabled) {
s = ctx.batch->GetFromBatchAndDB(db_.get(), options, column_family, key,
value);
} else {
s = db_->Get(options, column_family, key, value);
@@ -622,14 +622,14 @@ rocksdb::Status Storage::Get(engine::Context &ctx, const
rocksdb::ReadOptions &o
rocksdb::Status Storage::Get(engine::Context &ctx, const rocksdb::ReadOptions
&options,
rocksdb::ColumnFamilyHandle *column_family, const
rocksdb::Slice &key,
rocksdb::PinnableSlice *value) {
- if (ctx.is_txn_mode) {
+ if (ctx.txn_context_enabled) {
DCHECK_NOTNULL(options.snapshot);
DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(),
options.snapshot->GetSequenceNumber());
}
rocksdb::Status s;
if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
s = txn_write_batch_->GetFromBatchAndDB(db_.get(), options, column_family,
key, value);
- } else if (ctx.is_txn_mode && ctx.batch) {
+ } else if (ctx.txn_context_enabled && ctx.batch) {
s = ctx.batch->GetFromBatchAndDB(db_.get(), options, column_family, key,
value);
} else {
s = db_->Get(options, column_family, key, value);
@@ -655,14 +655,14 @@ void Storage::recordKeyspaceStat(const
rocksdb::ColumnFamilyHandle *column_famil
rocksdb::Iterator *Storage::NewIterator(engine::Context &ctx, const
rocksdb::ReadOptions &options,
rocksdb::ColumnFamilyHandle
*column_family) {
- if (ctx.is_txn_mode) {
+ if (ctx.txn_context_enabled) {
DCHECK_NOTNULL(options.snapshot);
DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(),
options.snapshot->GetSequenceNumber());
}
auto iter = db_->NewIterator(options, column_family);
if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
return txn_write_batch_->NewIteratorWithBase(column_family, iter,
&options);
- } else if (ctx.is_txn_mode && ctx.batch &&
ctx.batch->GetWriteBatch()->Count() > 0) {
+ } else if (ctx.txn_context_enabled && ctx.batch &&
ctx.batch->GetWriteBatch()->Count() > 0) {
return ctx.batch->NewIteratorWithBase(column_family, iter, &options);
}
return iter;
@@ -671,14 +671,14 @@ rocksdb::Iterator *Storage::NewIterator(engine::Context
&ctx, const rocksdb::Rea
void Storage::MultiGet(engine::Context &ctx, const rocksdb::ReadOptions
&options,
rocksdb::ColumnFamilyHandle *column_family, const
size_t num_keys, const rocksdb::Slice *keys,
rocksdb::PinnableSlice *values, rocksdb::Status
*statuses) {
- if (ctx.is_txn_mode) {
+ if (ctx.txn_context_enabled) {
DCHECK_NOTNULL(options.snapshot);
DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(),
options.snapshot->GetSequenceNumber());
}
if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
txn_write_batch_->MultiGetFromBatchAndDB(db_.get(), options,
column_family, num_keys, keys, values, statuses,
false);
- } else if (ctx.is_txn_mode && ctx.batch) {
+ } else if (ctx.txn_context_enabled && ctx.batch) {
ctx.batch->MultiGetFromBatchAndDB(db_.get(), options, column_family,
num_keys, keys, values, statuses, false);
} else {
db_->MultiGet(options, column_family, num_keys, keys, values, statuses,
false);
@@ -700,18 +700,21 @@ rocksdb::Status Storage::Write(engine::Context &ctx,
const rocksdb::WriteOptions
rocksdb::Status Storage::writeToDB(engine::Context &ctx, const
rocksdb::WriteOptions &options,
rocksdb::WriteBatch *updates) {
- // Put replication id logdata at the end of write batch
+ // Put replication id logdata at the end of `updates`.
if (replid_.length() == kReplIdLength) {
updates->PutLogData(ServerLogData(kReplIdLog, replid_).Encode());
}
- if (ctx.is_txn_mode) {
+ if (ctx.txn_context_enabled) {
+ // Extract writes from the updates and append to the ctx.batch
if (ctx.batch == nullptr) {
ctx.batch = std::make_unique<rocksdb::WriteBatchWithIndex>();
}
WriteBatchIndexer handle(ctx);
auto s = updates->Iterate(&handle);
if (!s.ok()) return s;
+ } else {
+ DCHECK(ctx.batch == nullptr);
}
return db_->Write(options, updates);
@@ -1278,19 +1281,19 @@ bool Storage::ReplDataManager::FileExists(Storage
*storage, const std::string &d
[[nodiscard]] rocksdb::ReadOptions Context::GetReadOptions() {
rocksdb::ReadOptions read_options;
- if (is_txn_mode) read_options.snapshot = GetSnapshot();
+ if (txn_context_enabled) read_options.snapshot = GetSnapshot();
return read_options;
}
[[nodiscard]] rocksdb::ReadOptions Context::DefaultScanOptions() {
rocksdb::ReadOptions read_options = storage->DefaultScanOptions();
- if (is_txn_mode) read_options.snapshot = GetSnapshot();
+ if (txn_context_enabled) read_options.snapshot = GetSnapshot();
return read_options;
}
[[nodiscard]] rocksdb::ReadOptions Context::DefaultMultiGetOptions() {
rocksdb::ReadOptions read_options = storage->DefaultMultiGetOptions();
- if (is_txn_mode) read_options.snapshot = GetSnapshot();
+ if (txn_context_enabled) read_options.snapshot = GetSnapshot();
return read_options;
}
diff --git a/src/storage/storage.h b/src/storage/storage.h
index c29b4689..3a45ccd4 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -248,11 +248,12 @@ class Storage {
rocksdb::ColumnFamilyHandle *column_family);
rocksdb::Iterator *NewIterator(engine::Context &ctx, const
rocksdb::ReadOptions &options);
- [[nodiscard]] rocksdb::Status Write(engine::Context &ctx, const
rocksdb::WriteOptions &options,
- rocksdb::WriteBatch *updates);
- const rocksdb::WriteOptions &DefaultWriteOptions() { return
default_write_opts_; }
+ const rocksdb::WriteOptions &DefaultWriteOptions() const { return
default_write_opts_; }
rocksdb::ReadOptions DefaultScanOptions() const;
rocksdb::ReadOptions DefaultMultiGetOptions() const;
+
+ [[nodiscard]] rocksdb::Status Write(engine::Context &ctx, const
rocksdb::WriteOptions &options,
+ rocksdb::WriteBatch *updates);
[[nodiscard]] rocksdb::Status Delete(engine::Context &ctx, const
rocksdb::WriteOptions &options,
rocksdb::ColumnFamilyHandle *cf_handle,
const rocksdb::Slice &key);
[[nodiscard]] rocksdb::Status DeleteRange(engine::Context &ctx, const
rocksdb::WriteOptions &options,
@@ -336,6 +337,9 @@ class Storage {
void SetDBInRetryableIOError(bool yes_or_no) { db_in_retryable_io_error_ =
yes_or_no; }
bool IsDBInRetryableIOError() const { return db_in_retryable_io_error_; }
+ /// Redis PSYNC relies on a Unique Replication Sequence Id when
use-rsid-psync
+ /// enabled.
+ /// ShiftReplId would generate an Id and write it to propagate cf.
Status ShiftReplId(engine::Context &ctx);
std::string GetReplIdFromWalBySeq(rocksdb::SequenceNumber seq);
std::string GetReplIdFromDbEngine();
@@ -363,6 +367,8 @@ class Storage {
std::atomic<bool> db_in_retryable_io_error_{false};
+ // is_txn_mode_ is used to determine whether the current Storage is in
transactional mode,
+ // .i.e, in "EXEC" command(CommandExec).
std::atomic<bool> is_txn_mode_ = false;
// txn_write_batch_ is used as the global write batch for the transaction
mode,
// all writes will be grouped in this write batch when entering the
transaction mode,
@@ -380,39 +386,48 @@ class Storage {
/// Context passes fixed snapshot and batch between APIs
///
-/// Limitations: Performing a large number of writes on the same Context may
reduce performance.
-/// Please choose to use the same Context or create a new Context based on the
actual situation.
+/// Limitations: Performing a large number of writes or apply operations like
DeleteRange
+/// on the same Context may reduce performance.
+/// Please choose to use the same Context or create a new Context based on the
actual
+/// situation.
///
/// Context does not provide thread safety guarantees and is generally only
passed as a parameter between APIs.
struct Context {
engine::Storage *storage = nullptr;
+ /// batch can be nullptr if
+ /// 1. The Context is not in transactional mode.
+ /// 2. The Context is in transactional mode, but no write operation is
performed.
std::unique_ptr<rocksdb::WriteBatchWithIndex> batch = nullptr;
- /// is_txn_mode is used to determine whether the current Context is in
transactional mode,
+ /// txn_context_enabled is used to determine whether the current Context is
in transactional mode,
/// if it is not transactional mode, then Context is equivalent to a Storage.
/// If the configuration of txn-context-enabled is no, it is false.
- bool is_txn_mode = true;
+ bool txn_context_enabled = true;
/// NoTransactionContext returns a Context with a is_txn_mode of false
- static Context NoTransactionContext(engine::Storage *storage) { return
Context(storage, false); }
+ static Context NoTransactionContext(engine::Storage *storage) { return
Context(storage, /*txn_mode=*/false); }
- /// GetReadOptions returns a default ReadOptions, and if is_txn_mode = true,
then its snapshot is specified by the
- /// Context
+ /// GetReadOptions returns a default ReadOptions, and if txn_context_enabled
= true,
+ /// then its snapshot is specified by the Context.
+ /// Otherwise it is the same as Storage::DefaultReadOptions().
[[nodiscard]] rocksdb::ReadOptions GetReadOptions();
- /// DefaultScanOptions returns a DefaultScanOptions, and if is_txn_mode =
true, then its snapshot is specified by the
- /// Context. Otherwise it is the same as Storage::DefaultScanOptions
+ /// DefaultScanOptions returns a DefaultScanOptions, and if
txn_context_enabled = true,
+ /// then its snapshot is specified by the Context.
+ /// Otherwise it is the same as Storage::DefaultScanOptions().
[[nodiscard]] rocksdb::ReadOptions DefaultScanOptions();
- /// DefaultMultiGetOptions returns a DefaultMultiGetOptions, and if
is_txn_mode = true, then its snapshot is specified
- /// by the Context. Otherwise it is the same as
Storage::DefaultMultiGetOptions
+ /// DefaultMultiGetOptions returns a DefaultMultiGetOptions, and if
txn_context_enabled = true,
+ /// then its snapshot is specified by the Context.
+ /// Otherwise it is the same as Storage::DefaultMultiGetOptions
[[nodiscard]] rocksdb::ReadOptions DefaultMultiGetOptions();
void RefreshLatestSnapshot();
/// TODO: Change it to defer getting the context, and the snapshot is pinned
after the first read operation
explicit Context(engine::Storage *storage)
- : storage(storage),
is_txn_mode(storage->GetConfig()->txn_context_enabled) {}
+ : storage(storage),
txn_context_enabled(storage->GetConfig()->txn_context_enabled) {}
~Context() {
+ // A moved-from object doesn't have `storage`.
if (storage) {
if (snapshot_ && storage->GetDB()) {
storage->GetDB()->ReleaseSnapshot(snapshot_);
@@ -441,6 +456,8 @@ struct Context {
// and it's not a thread-safe operation.
const rocksdb::Snapshot *GetSnapshot() {
if (snapshot_ == nullptr) {
+ // Should not acquire a snapshot_ on a moved-from object.
+ DCHECK(storage != nullptr);
snapshot_ = storage->GetDB()->GetSnapshot(); // NOLINT
}
return snapshot_;
@@ -448,7 +465,7 @@ struct Context {
private:
/// It is only used by NonTransactionContext
- explicit Context(engine::Storage *storage, bool txn_mode) :
storage(storage), is_txn_mode(txn_mode) {}
+ explicit Context(engine::Storage *storage, bool txn_mode) :
storage(storage), txn_context_enabled(txn_mode) {}
/// If is_txn_mode is true, snapshot should be specified instead of nullptr
when used,
/// and should be consistent with snapshot in ReadOptions to avoid ambiguity.