This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new c65e505f fix(config): avoid rewriting the config file if it's
unnecessary (#2347)
c65e505f is described below
commit c65e505fa6fe1e248c93aedb2668b8fc7f415bd4
Author: hulk <[email protected]>
AuthorDate: Sun Jun 2 00:20:38 2024 +0800
fix(config): avoid rewriting the config file if it's unnecessary (#2347)
The server will start by rewriting the config file to persist
namespace/token pairs
if namespace replication is enabled. But it's unnecessary if the namespace
replication
is disabled or no tokens are loaded from the configuration file because the
purpose of
this rewrite is to remove tokens from the config file and write them to the
database.
---
src/commands/cmd_server.cc | 2 +-
src/config/config.cc | 11 +++-
src/server/namespace.cc | 93 ++++++++++++++++-----------
src/server/namespace.h | 10 ++-
tests/gocase/unit/namespace/namespace_test.go | 9 +++
5 files changed, 84 insertions(+), 41 deletions(-)
diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc
index bde60fa5..114eee15 100644
--- a/src/commands/cmd_server.cc
+++ b/src/commands/cmd_server.cc
@@ -1275,7 +1275,7 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandAuth>("auth",
2, "read-only ok-loadin
MakeCmdAttr<CommandInfo>("info", -1, "read-only
ok-loading", 0, 0, 0),
MakeCmdAttr<CommandRole>("role", 1, "read-only
ok-loading", 0, 0, 0),
MakeCmdAttr<CommandConfig>("config", -2, "read-only",
0, 0, 0, GenerateConfigFlag),
- MakeCmdAttr<CommandNamespace>("namespace", -3,
"read-only exclusive", 0, 0, 0),
+ MakeCmdAttr<CommandNamespace>("namespace", -3,
"read-only", 0, 0, 0),
MakeCmdAttr<CommandKeys>("keys", 2, "read-only", 0, 0,
0),
MakeCmdAttr<CommandFlushDB>("flushdb", 1, "write
no-dbsize-check", 0, 0, 0),
MakeCmdAttr<CommandFlushAll>("flushall", 1, "write
no-dbsize-check", 0, 0, 0),
diff --git a/src/config/config.cc b/src/config/config.cc
index 500ac717..4139dfc6 100644
--- a/src/config/config.cc
+++ b/src/config/config.cc
@@ -885,11 +885,20 @@ Status Config::Set(Server *srv, std::string key, const
std::string &value) {
if (!s.IsOK()) return s.Prefixed("invalid value");
}
+ auto origin_value = field->ToString();
auto s = field->Set(value);
if (!s.IsOK()) return s.Prefixed("failed to set new value");
if (field->callback) {
- return field->callback(srv, key, value);
+ s = field->callback(srv, key, value);
+ if (!s.IsOK()) {
+ // rollback the value if the callback failed
+ auto set_status = field->Set(origin_value);
+ if (!set_status.IsOK()) {
+ return set_status.Prefixed("failed to rollback the value");
+ }
+ }
+ return s;
}
return Status::OK();
diff --git a/src/server/namespace.cc b/src/server/namespace.cc
index 504ad523..31b169ed 100644
--- a/src/server/namespace.cc
+++ b/src/server/namespace.cc
@@ -51,40 +51,53 @@ bool Namespace::IsAllowModify() const {
return config->HasConfigFile() || config->repl_namespace_enabled;
}
+Status Namespace::loadFromDB(std::map<std::string, std::string>* db_tokens)
const {
+ std::string value;
+ auto s = storage_->Get(rocksdb::ReadOptions(), cf_, kNamespaceDBKey, &value);
+ if (!s.ok()) {
+ if (s.IsNotFound()) return Status::OK();
+ return {Status::NotOK, s.ToString()};
+ }
+
+ jsoncons::json j = jsoncons::json::parse(value);
+ for (const auto& iter : j.object_range()) {
+ db_tokens->insert({iter.key(), iter.value().as_string()});
+ }
+ return Status::OK();
+}
+
Status Namespace::LoadAndRewrite() {
auto config = storage_->GetConfig();
// Namespace is NOT allowed in the cluster mode, so we don't need to rewrite
here.
if (config->cluster_enabled) return Status::OK();
- // Load from the configuration file first
- tokens_ = config->load_tokens;
+ std::map<std::string, std::string> db_tokens;
+ auto s = loadFromDB(&db_tokens);
+ if (!s.IsOK()) return s;
- // We would like to load namespaces from db even if repl_namespace_enabled
is false,
- // this can avoid missing some namespaces when turn on/off
repl_namespace_enabled.
- std::string value;
- auto s = storage_->Get(rocksdb::ReadOptions(), cf_, kNamespaceDBKey, &value);
- if (!s.ok() && !s.IsNotFound()) {
- return {Status::NotOK, s.ToString()};
+ if (!db_tokens.empty() && !config->repl_namespace_enabled) {
+ return {Status::NotOK, "cannot switch off repl_namespace_enabled when
namespaces exist in db"};
}
- if (s.ok()) {
- // The namespace db key is existed, so it doesn't allow to switch off
repl_namespace_enabled
- if (!config->repl_namespace_enabled) {
- return {Status::NotOK, "cannot switch off repl_namespace_enabled when
namespaces exist in db"};
- }
- jsoncons::json j = jsoncons::json::parse(value);
- for (const auto& iter : j.object_range()) {
- if (tokens_.find(iter.key()) == tokens_.end()) {
- // merge the namespace from db
- tokens_[iter.key()] = iter.value().as<std::string>();
- }
+ std::unique_lock<std::shared_mutex> lock(tokens_mu_);
+ // Load from the configuration file first
+ tokens_ = config->load_tokens;
+ // Merge the tokens from the database if the token is not in the
configuration file
+ for (const auto& iter : db_tokens) {
+ if (tokens_.find(iter.first) == tokens_.end()) {
+ tokens_[iter.first] = iter.second;
}
}
- return Rewrite();
+ // The following rewrite is to remove namespace/token pairs from the
configuration if the namespace replication
+ // is enabled. So we don't need to do that if no tokens are loaded or the
namespace replication is disabled.
+ if (config->load_tokens.empty() || !config->repl_namespace_enabled) return
Status::OK();
+
+ return Rewrite(tokens_);
}
-StatusOr<std::string> Namespace::Get(const std::string& ns) const {
+StatusOr<std::string> Namespace::Get(const std::string& ns) {
+ std::shared_lock lock(tokens_mu_);
for (const auto& iter : tokens_) {
if (iter.second == ns) {
return iter.first;
@@ -93,7 +106,8 @@ StatusOr<std::string> Namespace::Get(const std::string& ns)
const {
return {Status::NotFound};
}
-StatusOr<std::string> Namespace::GetByToken(const std::string& token) const {
+StatusOr<std::string> Namespace::GetByToken(const std::string& token) {
+ std::shared_lock lock(tokens_mu_);
auto iter = tokens_.find(token);
if (iter == tokens_.end()) {
return {Status::NotFound};
@@ -121,6 +135,7 @@ Status Namespace::Set(const std::string& ns, const
std::string& token) {
return {Status::NotOK, kErrInvalidToken};
}
+ std::unique_lock lock(tokens_mu_);
for (const auto& iter : tokens_) {
if (iter.second == ns) { // need to delete the old token first
tokens_.erase(iter.first);
@@ -129,7 +144,7 @@ Status Namespace::Set(const std::string& ns, const
std::string& token) {
}
tokens_[token] = ns;
- s = Rewrite();
+ s = Rewrite(tokens_);
if (!s.IsOK()) {
tokens_.erase(token);
return s;
@@ -138,17 +153,22 @@ Status Namespace::Set(const std::string& ns, const
std::string& token) {
}
Status Namespace::Add(const std::string& ns, const std::string& token) {
- // duplicate namespace
- for (const auto& iter : tokens_) {
- if (iter.second == ns) {
- if (iter.first == token) return Status::OK();
- return {Status::NotOK, kErrNamespaceExists};
+ {
+ std::shared_lock lock(tokens_mu_);
+ // duplicate namespace
+ for (const auto& iter : tokens_) {
+ if (iter.second == ns) {
+ if (iter.first == token) return Status::OK();
+ return {Status::NotOK, kErrNamespaceExists};
+ }
+ }
+ // duplicate token
+ if (tokens_.find(token) != tokens_.end()) {
+ return {Status::NotOK, kErrTokenExists};
}
}
- // duplicate token
- if (tokens_.find(token) != tokens_.end()) {
- return {Status::NotOK, kErrTokenExists};
- }
+
+ // we don't need to lock the mutex here because the Set method will lock it
return Set(ns, token);
}
@@ -160,10 +180,11 @@ Status Namespace::Del(const std::string& ns) {
return {Status::NotOK, kErrCantModifyNamespace};
}
+ std::unique_lock lock(tokens_mu_);
for (const auto& iter : tokens_) {
if (iter.second == ns) {
tokens_.erase(iter.first);
- auto s = Rewrite();
+ auto s = Rewrite(tokens_);
if (!s.IsOK()) {
tokens_[iter.first] = iter.second;
return s;
@@ -174,11 +195,11 @@ Status Namespace::Del(const std::string& ns) {
return {Status::NotOK, kErrNamespaceNotFound};
}
-Status Namespace::Rewrite() {
+Status Namespace::Rewrite(const std::map<std::string, std::string>& tokens)
const {
auto config = storage_->GetConfig();
// Rewrite the configuration file only if it's running with the
configuration file
if (config->HasConfigFile()) {
- auto s = config->Rewrite(tokens_);
+ auto s = config->Rewrite(tokens);
if (!s.IsOK()) {
return s;
}
@@ -195,7 +216,7 @@ Status Namespace::Rewrite() {
return Status::OK();
}
jsoncons::json json;
- for (const auto& iter : tokens_) {
+ for (const auto& iter : tokens) {
json[iter.first] = iter.second;
}
return storage_->WriteToPropagateCF(kNamespaceDBKey, json.to_string());
diff --git a/src/server/namespace.h b/src/server/namespace.h
index c0556eeb..1eadd656 100644
--- a/src/server/namespace.h
+++ b/src/server/namespace.h
@@ -34,17 +34,21 @@ class Namespace {
Namespace &operator=(const Namespace &) = delete;
Status LoadAndRewrite();
- StatusOr<std::string> Get(const std::string &ns) const;
- StatusOr<std::string> GetByToken(const std::string &token) const;
+ StatusOr<std::string> Get(const std::string &ns);
+ StatusOr<std::string> GetByToken(const std::string &token);
Status Set(const std::string &ns, const std::string &token);
Status Add(const std::string &ns, const std::string &token);
Status Del(const std::string &ns);
const std::map<std::string, std::string> &List() const { return tokens_; }
- Status Rewrite();
+ Status Rewrite(const std::map<std::string, std::string> &tokens) const;
bool IsAllowModify() const;
private:
engine::Storage *storage_;
rocksdb::ColumnFamilyHandle *cf_ = nullptr;
+
+ std::shared_mutex tokens_mu_;
std::map<std::string, std::string> tokens_;
+
+ Status loadFromDB(std::map<std::string, std::string> *db_tokens) const;
};
diff --git a/tests/gocase/unit/namespace/namespace_test.go
b/tests/gocase/unit/namespace/namespace_test.go
index 1b80e5b9..755766bb 100644
--- a/tests/gocase/unit/namespace/namespace_test.go
+++ b/tests/gocase/unit/namespace/namespace_test.go
@@ -238,7 +238,16 @@ func TestNamespaceReplicate(t *testing.T) {
})
t.Run("Turn off namespace replication is not allowed", func(t
*testing.T) {
+ r := masterRdb.Do(ctx, "NAMESPACE", "ADD", "test-ns",
"ns-token")
+ require.NoError(t, r.Err())
+ require.Equal(t, "OK", r.Val())
util.ErrorRegexp(t, masterRdb.ConfigSet(ctx,
"repl-namespace-enabled", "no").Err(), ".*cannot switch off
repl_namespace_enabled when namespaces exist in db.*")
+
+ // it should be allowed after deleting all namespaces
+ r = masterRdb.Do(ctx, "NAMESPACE", "DEL", "test-ns")
+ require.NoError(t, r.Err())
+ require.Equal(t, "OK", r.Val())
+ require.NoError(t, masterRdb.ConfigSet(ctx,
"repl-namespace-enabled", "no").Err())
})
}