This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new f9728192 Add the support of the namespace replication (#1776)
f9728192 is described below
commit f97281928f51188b0592e93110981c8a739bbe64
Author: hulk <[email protected]>
AuthorDate: Mon Oct 9 14:15:52 2023 +0800
Add the support of the namespace replication (#1776)
Currently, Kvrocks won't propagate the namespace and token between master
and slaves, and it may cause trouble if users add them in master but missing in
slaves. So it would make this process easier if we could propagate the
namespace/token as well.
## Design
To implement this feature, we need to consider the following situations:
- How to replicate namespaces between the master and slaves
- How to be compatible with old versions
**For the replication:**
We now have the propagate column family to propagate commands between the
master and slaves, so we can use it to replicate namespaces as well. The steps
are like below:
1. Modify namespace/token in master(forbid changing in slaves)
2. Propagate the namespace event to notify slaves like the Lua script
3. Slaves replay namespace/token changes
This works well for new instances, but what if there are different
namespaces between the master and slaves in old instances? There are three
possible conditions:
1. Namespace/token exists in master but NOT in slave
2. Namespace exists in slave but NOT in master
3. Namespace exists both in master and slave, but the token is different
For condition 1, we can add the namespace/token to the slave as well. For
condition 2, it makes sense to keep the namespace NOT existing in the master
since data are from it, so we can remove the namespace/token. The most
troublesome condition is 3 since users may have already used the token to
access the data, changing the token would cause an invalid password error. **To
make this simple, I prefer also overwriting the token.** And offer the
configuration to allow users to enable or [...]
To be noticed, we don't expect to propagate the default namespace since it
may affect the replication part, so we won't persist the default namespace.
**For the compatible issue:**
Currently, namespaces are kept in the configuration file, so we need to
read from the rocksdb first(store in propagate column like Lua script) when
starting:
- If the key exists in rocksdb, load namespace/token from rocksdb and check
if any namespace/token is inside the configuration file. If yes, refuse to
start the server and ask users to remove them and add them via the `namespace
add` command.
- If the key does NOT exist in rocksdb, read namespace/token from the
configuration file and write them into rocksdb. To be noticed, we need to add
an empty value for the key even though no namespace/token.
Users can only modify the namespace via command if this feature is enabled.
---
kvrocks.conf | 9 ++
src/cluster/replication.cc | 5 +
src/commands/cmd_server.cc | 35 ++---
src/config/config.cc | 128 +++---------------
src/config/config.h | 15 ++-
src/server/namespace.cc | 182 ++++++++++++++++++++++++++
src/server/namespace.h | 50 +++++++
src/server/server.cc | 12 +-
src/server/server.h | 5 +
src/storage/storage.h | 2 +-
tests/cppunit/config_test.cc | 142 +-------------------
tests/cppunit/namespace_test.cc | 71 ++++++++++
tests/cppunit/test_base.h | 9 +-
tests/gocase/unit/namespace/namespace_test.go | 144 +++++++++++++++++++-
14 files changed, 527 insertions(+), 282 deletions(-)
diff --git a/kvrocks.conf b/kvrocks.conf
index d2b027cb..7089cec8 100644
--- a/kvrocks.conf
+++ b/kvrocks.conf
@@ -51,6 +51,15 @@ daemonize no
cluster-enabled no
+# By default, namespaces are stored in the configuration file and won't be
replicated
+# to replicas. This option allows to change this behavior, so that namespaces
are also
+# propagated to slaves. Note that:
+# 1) it won't replicate the 'masterauth' to prevent breaking master/replica
replication
+# 2) it will overwrite replica's namespace with master's namespace, so be
careful of in-using namespaces
+# 3) cannot switch off the namespace replication once it's enabled
+#
+# Default: no
+repl-namespace-enabled no
# Persist the cluster nodes topology in local file($dir/nodes.conf). This
configuration
# takes effect only if the cluster mode was enabled.
diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index a6110a86..c2d17b5a 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -975,6 +975,11 @@ Status ReplicationThread::parseWriteBatch(const
std::string &batch_string) {
return s.Prefixed("failed to execute propagate command");
}
}
+ } else if (write_batch_handler.Key() == kNamespaceDBKey) {
+ auto s = srv_->GetNamespace()->LoadAndRewrite();
+ if (!s.IsOK()) {
+ return s.Prefixed("failed to load namespaces");
+ }
}
break;
case kBatchTypeStream: {
diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc
index 232b17c8..fe96074e 100644
--- a/src/commands/cmd_server.cc
+++ b/src/commands/cmd_server.cc
@@ -39,15 +39,15 @@ enum class AuthResult {
NO_REQUIRE_PASS,
};
-AuthResult AuthenticateUser(Connection *conn, Config *config, const
std::string &user_password) {
- auto iter = config->tokens.find(user_password);
- if (iter != config->tokens.end()) {
- conn->SetNamespace(iter->second);
+AuthResult AuthenticateUser(Server *srv, Connection *conn, const std::string
&user_password) {
+ auto ns = srv->GetNamespace()->GetByToken(user_password);
+ if (ns.IsOK()) {
+ conn->SetNamespace(ns.GetValue());
conn->BecomeUser();
return AuthResult::OK;
}
- const auto &requirepass = config->requirepass;
+ const auto &requirepass = srv->GetConfig()->requirepass;
if (!requirepass.empty() && user_password != requirepass) {
return AuthResult::INVALID_PASSWORD;
}
@@ -64,9 +64,8 @@ AuthResult AuthenticateUser(Connection *conn, Config *config,
const std::string
class CommandAuth : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
- Config *config = svr->GetConfig();
auto &user_password = args_[1];
- AuthResult result = AuthenticateUser(conn, config, user_password);
+ AuthResult result = AuthenticateUser(svr, conn, user_password);
switch (result) {
case AuthResult::OK:
*output = redis::SimpleString("OK");
@@ -89,10 +88,13 @@ class CommandNamespace : public Commander {
Config *config = svr->GetConfig();
std::string sub_command = util::ToLower(args_[1]);
+ if (config->repl_namespace_enabled && config->IsSlave() && sub_command !=
"get") {
+ return {Status::RedisExecErr, "namespace is read-only for slave"};
+ }
if (args_.size() == 3 && sub_command == "get") {
if (args_[2] == "*") {
std::vector<std::string> namespaces;
- auto tokens = config->tokens;
+ auto tokens = svr->GetNamespace()->List();
for (auto &token : tokens) {
namespaces.emplace_back(token.second); // namespace
namespaces.emplace_back(token.first); // token
@@ -101,26 +103,25 @@ class CommandNamespace : public Commander {
namespaces.emplace_back(config->requirepass);
*output = redis::MultiBulkString(namespaces, false);
} else {
- std::string token;
- auto s = config->GetNamespace(args_[2], &token);
- if (s.Is<Status::NotFound>()) {
+ auto token = svr->GetNamespace()->Get(args_[2]);
+ if (token.Is<Status::NotFound>()) {
*output = redis::NilString();
} else {
- *output = redis::BulkString(token);
+ *output = redis::BulkString(token.GetValue());
}
}
} else if (args_.size() == 4 && sub_command == "set") {
- Status s = config->SetNamespace(args_[2], args_[3]);
+ Status s = svr->GetNamespace()->Set(args_[2], args_[3]);
*output = s.IsOK() ? redis::SimpleString("OK") : redis::Error("ERR " +
s.Msg());
LOG(WARNING) << "Updated namespace: " << args_[2] << " with token: " <<
args_[3] << ", addr: " << conn->GetAddr()
<< ", result: " << s.Msg();
} else if (args_.size() == 4 && sub_command == "add") {
- Status s = config->AddNamespace(args_[2], args_[3]);
+ Status s = svr->GetNamespace()->Add(args_[2], args_[3]);
*output = s.IsOK() ? redis::SimpleString("OK") : redis::Error("ERR " +
s.Msg());
LOG(WARNING) << "New namespace: " << args_[2] << " with token: " <<
args_[3] << ", addr: " << conn->GetAddr()
<< ", result: " << s.Msg();
} else if (args_.size() == 3 && sub_command == "del") {
- Status s = config->DelNamespace(args_[2]);
+ Status s = svr->GetNamespace()->Del(args_[2]);
*output = s.IsOK() ? redis::SimpleString("OK") : redis::Error("ERR " +
s.Msg());
LOG(WARNING) << "Deleted namespace: " << args_[2] << ", addr: " <<
conn->GetAddr() << ", result: " << s.Msg();
} else {
@@ -239,7 +240,7 @@ class CommandConfig : public Commander {
}
if (args_.size() == 2 && sub_command == "rewrite") {
- Status s = config->Rewrite();
+ Status s = config->Rewrite(svr->GetNamespace()->List());
if (!s.IsOK()) return {Status::RedisExecErr, s.Msg()};
*output = redis::SimpleString("OK");
@@ -709,7 +710,7 @@ class CommandHello final : public Commander {
next_arg++;
}
const auto &user_password = args_[next_arg + 1];
- auto auth_result = AuthenticateUser(conn, svr->GetConfig(),
user_password);
+ auto auth_result = AuthenticateUser(svr, conn, user_password);
switch (auth_result) {
case AuthResult::INVALID_PASSWORD:
return {Status::NotOK, "invalid password"};
diff --git a/src/config/config.cc b/src/config/config.cc
index e12edc38..ad922dd6 100644
--- a/src/config/config.cc
+++ b/src/config/config.cc
@@ -159,6 +159,7 @@ Config::Config() {
{"log-retention-days", false, new IntField(&log_retention_days, -1, -1,
INT_MAX)},
{"persist-cluster-nodes-enabled", false, new
YesNoField(&persist_cluster_nodes_enabled, true)},
{"redis-cursor-compatible", false, new
YesNoField(&redis_cursor_compatible, false)},
+ {"repl-namespace-enabled", false, new
YesNoField(&repl_namespace_enabled, false)},
/* rocksdb options */
{"rocksdb.compression", false,
@@ -233,17 +234,17 @@ void Config::initFieldValidator() {
std::map<std::string, ValidateFn> validators = {
{"requirepass",
[this](const std::string &k, const std::string &v) -> Status {
- if (v.empty() && !tokens.empty()) {
+ if (v.empty() && !load_tokens.empty()) {
return {Status::NotOK, "requirepass empty not allowed while the
namespace exists"};
}
- if (tokens.find(v) != tokens.end()) {
+ if (load_tokens.find(v) != load_tokens.end()) {
return {Status::NotOK, "requirepass is duplicated with namespace
tokens"};
}
return Status::OK();
}},
{"masterauth",
[this](const std::string &k, const std::string &v) -> Status {
- if (tokens.find(v) != tokens.end()) {
+ if (load_tokens.find(v) != load_tokens.end()) {
return {Status::NotOK, "masterauth is duplicated with namespace
tokens"};
}
return Status::OK();
@@ -515,6 +516,12 @@ void Config::initFieldCallback() {
remove(nodes_file_path.data());
return Status::OK();
}},
+ {"repl-namespace-enabled",
+ [](Server *srv, const std::string &k, const std::string &v) -> Status {
+ if (!srv) return Status::OK();
+ return srv->GetNamespace()->LoadAndRewrite();
+ }},
+
{"rocksdb.target_file_size_base",
[this](Server *srv, const std::string &k, const std::string &v) ->
Status {
if (!srv) return Status::OK();
@@ -682,7 +689,7 @@ Status Config::parseConfigFromPair(const
std::pair<std::string, std::string> &in
if (strncasecmp(input.first.data(), ns_str, ns_str_size) == 0) {
// namespace should keep key case-sensitive
field_key = input.first;
- tokens[input.second] = input.first.substr(ns_str_size);
+ load_tokens[input.second] = input.first.substr(ns_str_size);
return Status::OK();
}
@@ -711,10 +718,10 @@ Status Config::parseConfigFromString(const std::string
&input, int line_number)
}
Status Config::finish() {
- if (requirepass.empty() && !tokens.empty()) {
+ if (requirepass.empty() && !load_tokens.empty()) {
return {Status::NotOK, "requirepass empty wasn't allowed while the
namespace exists"};
}
- if ((cluster_enabled) && !tokens.empty()) {
+ if ((cluster_enabled) && !load_tokens.empty()) {
return {Status::NotOK, "enabled cluster mode wasn't allowed while the
namespace exists"};
}
if (unixsocket.empty() && binds.size() == 0) {
@@ -836,7 +843,7 @@ Status Config::Set(Server *svr, std::string key, const
std::string &value) {
return Status::OK();
}
-Status Config::Rewrite() {
+Status Config::Rewrite(const std::map<std::string, std::string> &tokens) {
if (path_.empty()) {
return {Status::NotOK, "the server is running without a config file"};
}
@@ -853,8 +860,10 @@ Status Config::Rewrite() {
}
std::string namespace_prefix = "namespace.";
- for (const auto &iter : tokens) {
- new_config[namespace_prefix + iter.second] = iter.first;
+ if (!repl_namespace_enabled) { // need to rewrite to the configuration if
we don't replicate namespaces
+ for (const auto &iter : tokens) {
+ new_config[namespace_prefix + iter.second] = iter.first;
+ }
}
std::ifstream file(path_);
@@ -900,104 +909,3 @@ Status Config::Rewrite() {
}
return Status::OK();
}
-
-Status Config::GetNamespace(const std::string &ns, std::string *token) const {
- token->clear();
- for (const auto &iter : tokens) {
- if (iter.second == ns) {
- *token = iter.first;
- return Status::OK();
- }
- }
- return {Status::NotFound};
-}
-
-Status Config::SetNamespace(const std::string &ns, const std::string &token) {
- if (ns == kDefaultNamespace) {
- return {Status::NotOK, "forbidden to update the default namespace"};
- }
- if (tokens.find(token) != tokens.end()) {
- return {Status::NotOK, "the token has already exists"};
- }
-
- if (token == requirepass || token == masterauth) {
- return {Status::NotOK, "the token is duplicated with requirepass or
masterauth"};
- }
-
- for (const auto &iter : tokens) {
- if (iter.second == ns) {
- tokens.erase(iter.first);
- tokens[token] = ns;
- auto s = Rewrite();
- if (!s.IsOK()) {
- // Need to roll back the old token if fails to rewrite the config
- tokens.erase(token);
- tokens[iter.first] = ns;
- }
- return s;
- }
- }
- return {Status::NotOK, "the namespace was not found"};
-}
-
-Status Config::AddNamespace(const std::string &ns, const std::string &token) {
- if (requirepass.empty()) {
- return {Status::NotOK, "forbidden to add namespace when requirepass was
empty"};
- }
- if (cluster_enabled) {
- return {Status::NotOK, "forbidden to add namespace when cluster mode was
enabled"};
- }
- if (ns == kDefaultNamespace) {
- return {Status::NotOK, "forbidden to add the default namespace"};
- }
- auto s = isNamespaceLegal(ns);
- if (!s.IsOK()) return s;
- if (tokens.find(token) != tokens.end()) {
- return {Status::NotOK, "the token has already exists"};
- }
-
- if (token == requirepass || token == masterauth) {
- return {Status::NotOK, "the token is duplicated with requirepass or
masterauth"};
- }
-
- for (const auto &iter : tokens) {
- if (iter.second == ns) {
- return {Status::NotOK, "the namespace has already exists"};
- }
- }
- tokens[token] = ns;
-
- s = Rewrite();
- if (!s.IsOK()) {
- tokens.erase(token);
- }
- return s;
-}
-
-Status Config::DelNamespace(const std::string &ns) {
- if (ns == kDefaultNamespace) {
- return {Status::NotOK, "forbidden to delete the default namespace"};
- }
- for (const auto &iter : tokens) {
- if (iter.second == ns) {
- tokens.erase(iter.first);
- auto s = Rewrite();
- if (!s.IsOK()) {
- tokens[iter.first] = ns;
- }
- return s;
- }
- }
- return {Status::NotOK, "the namespace was not found"};
-}
-
-Status Config::isNamespaceLegal(const std::string &ns) {
- if (ns.size() > UINT8_MAX) {
- return {Status::NotOK, fmt::format("size exceed limit {}", UINT8_MAX)};
- }
- char last_char = ns.back();
- if (last_char == std::numeric_limits<char>::max()) {
- return {Status::NotOK, "namespace contain illegal letter"};
- }
- return Status::OK();
-}
diff --git a/src/config/config.h b/src/config/config.h
index b93496b7..d79d1527 100644
--- a/src/config/config.h
+++ b/src/config/config.h
@@ -136,7 +136,7 @@ struct Config {
CompactionCheckerRange compaction_checker_range{-1, -1};
int64_t force_compact_file_age;
int force_compact_file_min_deleted_percentage;
- std::map<std::string, std::string> tokens;
+ bool repl_namespace_enabled = false;
std::string replica_announce_ip;
uint32_t replica_announce_port = 0;
@@ -149,6 +149,11 @@ struct Config {
bool redis_cursor_compatible = false;
int log_retention_days;
+
+ // load_tokens is used to buffer the tokens when loading,
+ // don't use it to authenticate or rewrite the configuration file.
+ std::map<std::string, std::string> load_tokens;
+
// profiling
int profiling_sample_ratio = 0;
int profiling_sample_record_threshold_ms = 0;
@@ -210,16 +215,13 @@ struct Config {
mutable std::mutex backup_mu;
std::string NodesFilePath() const;
- Status Rewrite();
+ Status Rewrite(const std::map<std::string, std::string> &tokens);
Status Load(const CLIOptions &path);
void Get(const std::string &key, std::vector<std::string> *values) const;
Status Set(Server *svr, std::string key, const std::string &value);
void SetMaster(const std::string &host, uint32_t port);
void ClearMaster();
- Status GetNamespace(const std::string &ns, std::string *token) const;
- Status AddNamespace(const std::string &ns, const std::string &token);
- Status SetNamespace(const std::string &ns, const std::string &token);
- Status DelNamespace(const std::string &ns);
+ bool IsSlave() const { return !master_host.empty(); }
private:
std::string path_;
@@ -237,5 +239,4 @@ struct Config {
Status parseConfigFromPair(const std::pair<std::string, std::string> &input,
int line_number);
Status parseConfigFromString(const std::string &input, int line_number);
Status finish();
- static Status isNamespaceLegal(const std::string &ns);
};
diff --git a/src/server/namespace.cc b/src/server/namespace.cc
new file mode 100644
index 00000000..a0e8d05a
--- /dev/null
+++ b/src/server/namespace.cc
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "namespace.h"
+
+#include "jsoncons/json.hpp"
+
+// Error messages
+constexpr const char* kErrNamespaceExists = "the namespace already exists";
+constexpr const char* kErrTokenExists = "the token already exists";
+constexpr const char* kErrNamespaceNotFound = "the namespace was not found";
+constexpr const char* kErrRequiredPassEmpty = "forbidden to add namespace when
requirepass was empty";
+constexpr const char* kErrClusterModeEnabled = "forbidden to add namespace
when cluster mode was enabled";
+constexpr const char* kErrDeleteDefaultNamespace = "forbidden to delete the
default namespace";
+constexpr const char* kErrAddDefaultNamespace = "forbidden to add the default
namespace";
+constexpr const char* kErrInvalidToken = "the token is duplicated with
requirepass or masterauth";
+
+Status IsNamespaceLegal(const std::string& ns) {
+ if (ns.size() > UINT8_MAX) {
+ return {Status::NotOK, fmt::format("size exceed limit {}", UINT8_MAX)};
+ }
+ char last_char = ns.back();
+ if (last_char == std::numeric_limits<char>::max()) {
+ return {Status::NotOK, "namespace contain illegal letter"};
+ }
+ return Status::OK();
+}
+
+Status Namespace::LoadAndRewrite() {
+ auto config = storage_->GetConfig();
+ // Load from the configuration file first
+ tokens_ = config->load_tokens;
+
+ // We would like to load namespaces from db even if repl_namespace_enabled
is false,
+ // this can avoid missing some namespaces when turn on/off
repl_namespace_enabled.
+ std::string value;
+ auto s = storage_->Get(rocksdb::ReadOptions(), cf_, kNamespaceDBKey, &value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return {Status::NotOK, s.ToString()};
+ }
+ if (s.ok()) {
+ // The namespace db key is existed, so it doesn't allow to switch off
repl_namespace_enabled
+ if (!config->repl_namespace_enabled) {
+ return {Status::NotOK, "cannot switch off repl_namespace_enabled when
namespaces exist in db"};
+ }
+
+ jsoncons::json j = jsoncons::json::parse(value);
+ for (const auto& iter : j.object_range()) {
+ if (tokens_.find(iter.key()) == tokens_.end()) {
+ // merge the namespace from db
+ tokens_[iter.key()] = iter.value().as<std::string>();
+ }
+ }
+ }
+
+ return Rewrite();
+}
+
+StatusOr<std::string> Namespace::Get(const std::string& ns) const {
+ for (const auto& iter : tokens_) {
+ if (iter.second == ns) {
+ return iter.first;
+ }
+ }
+ return {Status::NotFound};
+}
+
+StatusOr<std::string> Namespace::GetByToken(const std::string& token) const {
+ auto iter = tokens_.find(token);
+ if (iter == tokens_.end()) {
+ return {Status::NotFound};
+ }
+ return iter->second;
+}
+
+Status Namespace::Set(const std::string& ns, const std::string& token) {
+ auto s = IsNamespaceLegal(ns);
+ if (!s.IsOK()) return s;
+ auto config = storage_->GetConfig();
+ if (config->requirepass.empty()) {
+ return {Status::NotOK, kErrRequiredPassEmpty};
+ }
+ if (config->cluster_enabled) {
+ return {Status::NotOK, kErrClusterModeEnabled};
+ }
+ if (ns == kDefaultNamespace) {
+ return {Status::NotOK, kErrAddDefaultNamespace};
+ }
+ if (token == config->requirepass || token == config->masterauth) {
+ return {Status::NotOK, kErrInvalidToken};
+ }
+
+ for (const auto& iter : tokens_) {
+ if (iter.second == ns) { // need to delete the old token first
+ tokens_.erase(iter.first);
+ break;
+ }
+ }
+ tokens_[token] = ns;
+
+ s = Rewrite();
+ if (!s.IsOK()) {
+ tokens_.erase(token);
+ return s;
+ }
+ return Status::OK();
+}
+
+Status Namespace::Add(const std::string& ns, const std::string& token) {
+ // duplicate namespace
+ for (const auto& iter : tokens_) {
+ if (iter.second == ns) {
+ if (iter.first == token) return Status::OK();
+ return {Status::NotOK, kErrNamespaceExists};
+ }
+ }
+ // duplicate token
+ if (tokens_.find(token) != tokens_.end()) {
+ return {Status::NotOK, kErrTokenExists};
+ }
+ return Set(ns, token);
+}
+
+Status Namespace::Del(const std::string& ns) {
+ if (ns == kDefaultNamespace) {
+ return {Status::NotOK, kErrDeleteDefaultNamespace};
+ }
+
+ for (const auto& iter : tokens_) {
+ if (iter.second == ns) {
+ tokens_.erase(iter.first);
+ auto s = Rewrite();
+ if (!s.IsOK()) {
+ tokens_[iter.first] = iter.second;
+ return s;
+ }
+ return Status::OK();
+ }
+ }
+ return {Status::NotOK, kErrNamespaceNotFound};
+}
+
+Status Namespace::Rewrite() {
+ auto config = storage_->GetConfig();
+ auto s = config->Rewrite(tokens_);
+ if (!s.IsOK()) {
+ return s;
+ }
+
+ // Don't propagate write to DB if its role is slave to prevent from
+ // increasing the DB sequence number.
+ if (config->IsSlave()) {
+ return Status::OK();
+ }
+
+ // Don't need to write to db if repl_namespace_enabled is false
+ if (!config->repl_namespace_enabled) {
+ return Status::OK();
+ }
+ jsoncons::json json;
+ for (const auto& iter : tokens_) {
+ json[iter.first] = iter.second;
+ }
+ return storage_->WriteToPropagateCF(kNamespaceDBKey, json.to_string());
+}
diff --git a/src/server/namespace.h b/src/server/namespace.h
new file mode 100644
index 00000000..cccf46e0
--- /dev/null
+++ b/src/server/namespace.h
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include "storage/storage.h"
+
+constexpr const char *kNamespaceDBKey = "__namespace_keys__";
+
+class Namespace {
+ public:
+ explicit Namespace(engine::Storage *storage) : storage_(storage) {
+ cf_ = storage_->GetCFHandle(engine::kPropagateColumnFamilyName);
+ }
+
+ ~Namespace() = default;
+ Namespace(const Namespace &) = delete;
+ Namespace &operator=(const Namespace &) = delete;
+
+ Status LoadAndRewrite();
+ StatusOr<std::string> Get(const std::string &ns) const;
+ StatusOr<std::string> GetByToken(const std::string &token) const;
+ Status Set(const std::string &ns, const std::string &token);
+ Status Add(const std::string &ns, const std::string &token);
+ Status Del(const std::string &ns);
+ const std::map<std::string, std::string> &List() const { return tokens_; }
+ Status Rewrite();
+
+ private:
+ engine::Storage *storage_;
+ rocksdb::ColumnFamilyHandle *cf_ = nullptr;
+ std::map<std::string, std::string> tokens_;
+};
diff --git a/src/server/server.cc b/src/server/server.cc
index a023182c..412e7967 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -54,7 +54,7 @@
constexpr const char *REDIS_VERSION = "4.0.0";
Server::Server(engine::Storage *storage, Config *config)
- : storage(storage), start_time_(util::GetTimeStamp()), config_(config) {
+ : storage(storage), start_time_(util::GetTimeStamp()), config_(config),
namespace_(storage) {
// init commands stats here to prevent concurrent insert, and cause core
auto commands = redis::GetOriginalCommands();
for (const auto &iter : *commands) {
@@ -132,12 +132,16 @@ Server::~Server() {
// - feed-replica-data-info: generate checkpoint and send files list when
full sync
// - feed-replica-file: send SST files when slaves ask for full sync
Status Server::Start() {
+ auto s = namespace_.LoadAndRewrite();
+ if (!s.IsOK()) {
+ return s;
+ }
if (!config_->master_host.empty()) {
- Status s = AddMaster(config_->master_host,
static_cast<uint32_t>(config_->master_port), false);
+ s = AddMaster(config_->master_host,
static_cast<uint32_t>(config_->master_port), false);
if (!s.IsOK()) return s;
} else {
// Generate new replication id if not a replica
- auto s = storage->ShiftReplId();
+ s = storage->ShiftReplId();
if (!s.IsOK()) {
return s.Prefixed("failed to shift replication id");
}
@@ -1394,7 +1398,7 @@ Status Server::autoResizeBlockAndSST() {
config_->rocks_db.block_size = block_size;
}
- auto s = config_->Rewrite();
+ auto s = config_->Rewrite(namespace_.List());
LOG(INFO) << "[server] Rewrite config, result: " << s.Msg();
return Status::OK();
diff --git a/src/server/server.h b/src/server/server.h
index 9098d3f1..b6ac94ec 100644
--- a/src/server/server.h
+++ b/src/server/server.h
@@ -42,6 +42,7 @@
#include "cluster/slot_migrate.h"
#include "commands/commander.h"
#include "lua.hpp"
+#include "namespace.h"
#include "server/redis_connection.h"
#include "stats/log_collector.h"
#include "stats/stats.h"
@@ -287,6 +288,7 @@ class Server {
static bool IsWatchedKeysModified(redis::Connection *conn);
void ResetWatchedKeys(redis::Connection *conn);
std::list<std::pair<std::string, uint32_t>> GetSlaveHostAndPort();
+ Namespace *GetNamespace() { return &namespace_; }
#ifdef ENABLE_OPENSSL
UniqueSSLContext ssl_ctx;
@@ -325,6 +327,9 @@ class Server {
std::list<std::unique_ptr<FeedSlaveThread>> slave_threads_;
std::atomic<int> fetch_file_threads_num_ = 0;
+ // namespace
+ Namespace namespace_;
+
// Some jobs to operate DB should be unique
std::mutex db_job_mu_;
bool db_compacting_ = false;
diff --git a/src/storage/storage.h b/src/storage/storage.h
index 7c380793..902f7e76 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -145,7 +145,7 @@ class Storage {
uint64_t GetCompactionCount() const { return compaction_count_; }
void IncrCompactionCount(uint64_t n) { compaction_count_.fetch_add(n); }
bool IsSlotIdEncoded() const { return config_->slot_id_encoded; }
- const Config *GetConfig() const { return config_; }
+ Config *GetConfig() const { return config_; }
Status BeginTxn();
Status CommitTxn();
diff --git a/tests/cppunit/config_test.cc b/tests/cppunit/config_test.cc
index 94c51eb9..d62dc615 100644
--- a/tests/cppunit/config_test.cc
+++ b/tests/cppunit/config_test.cc
@@ -91,7 +91,7 @@ TEST(Config, GetAndSet) {
EXPECT_EQ(values[0], iter.first);
EXPECT_EQ(values[1], iter.second);
}
- ASSERT_TRUE(config.Rewrite().IsOK());
+ ASSERT_TRUE(config.Rewrite({}).IsOK());
s = config.Load(CLIOptions(path));
EXPECT_TRUE(s.IsOK());
for (const auto &iter : mutable_cases) {
@@ -174,7 +174,7 @@ TEST(Config, Rewrite) {
redis::ResetCommands();
Config config;
ASSERT_TRUE(config.Load(CLIOptions(path)).IsOK());
- ASSERT_TRUE(config.Rewrite().IsOK());
+ ASSERT_TRUE(config.Rewrite({}).IsOK());
// Need to re-populate the command table since it has renamed by the previous
redis::ResetCommands();
Config new_config;
@@ -182,144 +182,6 @@ TEST(Config, Rewrite) {
unlink(path);
}
-TEST(Namespace, Add) {
- const char *path = "test.conf";
- unlink(path);
-
- Config config;
- auto s = config.Load(CLIOptions(path));
- EXPECT_FALSE(s.IsOK());
- config.slot_id_encoded = false;
- EXPECT_TRUE(!config.AddNamespace("ns", "t0").IsOK());
- config.requirepass = "foobared";
-
- std::vector<std::string> namespaces = {"n1", "n2", "n3", "n4"};
- std::vector<std::string> tokens = {"t1", "t2", "t3", "t4"};
- for (size_t i = 0; i < namespaces.size(); i++) {
- EXPECT_TRUE(config.AddNamespace(namespaces[i], tokens[i]).IsOK());
- }
- for (size_t i = 0; i < namespaces.size(); i++) {
- std::string token;
- s = config.GetNamespace(namespaces[i], &token);
- EXPECT_TRUE(s.IsOK());
- EXPECT_EQ(token, tokens[i]);
- }
- for (size_t i = 0; i < namespaces.size(); i++) {
- s = config.AddNamespace(namespaces[i], tokens[i]);
- EXPECT_FALSE(s.IsOK());
- EXPECT_EQ(s.Msg(), "the token has already exists");
- }
- s = config.AddNamespace("n1", "t0");
- EXPECT_FALSE(s.IsOK());
- EXPECT_EQ(s.Msg(), "the namespace has already exists");
-
- s = config.AddNamespace(kDefaultNamespace, "mytoken");
- EXPECT_FALSE(s.IsOK());
- EXPECT_EQ(s.Msg(), "forbidden to add the default namespace");
- unlink(path);
-}
-
-TEST(Namespace, Set) {
- const char *path = "test.conf";
- unlink(path);
-
- Config config;
- auto s = config.Load(CLIOptions(path));
- EXPECT_FALSE(s.IsOK());
- config.slot_id_encoded = false;
- config.requirepass = "foobared";
- std::vector<std::string> namespaces = {"n1", "n2", "n3", "n4"};
- std::vector<std::string> tokens = {"t1", "t2", "t3", "t4"};
- std::vector<std::string> new_tokens = {"nt1", "nt2'", "nt3", "nt4"};
- for (size_t i = 0; i < namespaces.size(); i++) {
- s = config.SetNamespace(namespaces[i], tokens[i]);
- EXPECT_FALSE(s.IsOK());
- EXPECT_EQ(s.Msg(), "the namespace was not found");
- }
- for (size_t i = 0; i < namespaces.size(); i++) {
- EXPECT_TRUE(config.AddNamespace(namespaces[i], tokens[i]).IsOK());
- }
- for (size_t i = 0; i < namespaces.size(); i++) {
- std::string token;
- s = config.GetNamespace(namespaces[i], &token);
- EXPECT_TRUE(s.IsOK());
- EXPECT_EQ(token, tokens[i]);
- }
- for (size_t i = 0; i < namespaces.size(); i++) {
- EXPECT_TRUE(config.SetNamespace(namespaces[i], new_tokens[i]).IsOK());
- }
- for (size_t i = 0; i < namespaces.size(); i++) {
- std::string token;
- s = config.GetNamespace(namespaces[i], &token);
- EXPECT_TRUE(s.IsOK());
- EXPECT_EQ(token, new_tokens[i]);
- }
- unlink(path);
-}
-
-TEST(Namespace, Delete) {
- const char *path = "test.conf";
- unlink(path);
-
- Config config;
- auto s = config.Load(CLIOptions(path));
- EXPECT_FALSE(s.IsOK());
- config.slot_id_encoded = false;
- config.requirepass = "foobared";
- std::vector<std::string> namespaces = {"n1", "n2", "n3", "n4"};
- std::vector<std::string> tokens = {"t1", "t2", "t3", "t4"};
- for (size_t i = 0; i < namespaces.size(); i++) {
- EXPECT_TRUE(config.AddNamespace(namespaces[i], tokens[i]).IsOK());
- }
- for (size_t i = 0; i < namespaces.size(); i++) {
- std::string token;
- s = config.GetNamespace(namespaces[i], &token);
- EXPECT_TRUE(s.IsOK());
- EXPECT_EQ(token, tokens[i]);
- }
- for (const auto &ns : namespaces) {
- s = config.DelNamespace(ns);
- EXPECT_TRUE(s.IsOK());
- std::string token;
- s = config.GetNamespace(ns, &token);
- EXPECT_FALSE(s.IsOK());
- EXPECT_TRUE(token.empty());
- }
- unlink(path);
-}
-
-TEST(Namespace, RewriteNamespaces) {
- const char *path = "test.conf";
- unlink(path);
- Config config;
- auto s = config.Load(CLIOptions(path));
- EXPECT_FALSE(s.IsOK());
- config.requirepass = "test";
- config.backup_dir = "test";
- config.slot_id_encoded = false;
- std::vector<std::string> namespaces = {"n1", "n2", "n3", "n4"};
- std::vector<std::string> tokens = {"t1", "t2", "t3", "t4"};
- for (size_t i = 0; i < namespaces.size(); i++) {
- EXPECT_TRUE(config.AddNamespace(namespaces[i], tokens[i]).IsOK());
- }
- EXPECT_TRUE(config.AddNamespace("to-be-deleted-ns",
"to-be-deleted-token").IsOK());
- EXPECT_TRUE(config.DelNamespace("to-be-deleted-ns").IsOK());
-
- Config new_config;
- s = new_config.Load(CLIOptions(path));
- EXPECT_TRUE(s.IsOK());
- for (size_t i = 0; i < namespaces.size(); i++) {
- std::string token;
- s = new_config.GetNamespace(namespaces[i], &token);
- EXPECT_TRUE(s.IsOK());
- EXPECT_EQ(token, tokens[i]);
- }
-
- std::string token;
- EXPECT_FALSE(new_config.GetNamespace("to-be-deleted-ns", &token).IsOK());
- unlink(path);
-}
-
TEST(Config, ParseConfigLine) {
ASSERT_EQ(*ParseConfigLine(""), ConfigKV{});
ASSERT_EQ(*ParseConfigLine("# hello"), ConfigKV{});
diff --git a/tests/cppunit/namespace_test.cc b/tests/cppunit/namespace_test.cc
new file mode 100644
index 00000000..7d26a18e
--- /dev/null
+++ b/tests/cppunit/namespace_test.cc
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "server/namespace.h"
+
+#include <gtest/gtest.h>
+
+#include <fstream>
+
+#include "config/config.h"
+#include "test_base.h"
+
+class NamespaceTest : public TestBase {
+ protected:
+ explicit NamespaceTest() { config_->requirepass = "123"; }
+
+ void SetUp() override {}
+ void TearDown() override {}
+};
+
+TEST_F(NamespaceTest, AddAndDelete) {
+ for (const auto &v : {true, false}) {
+ auto ns = std::make_unique<Namespace>(storage_);
+ std::map<std::string, std::string> tokens = {
+ {"tokens2", "test_ns"}, {"tokens", "test_ns2"}, {"tokens3",
"test_ns3"}};
+ config_->repl_namespace_enabled = v;
+ for (const auto &iter : tokens) {
+ ASSERT_TRUE(ns->Add(iter.second, iter.first).IsOK());
+ }
+
+ // test add duplicate namespace
+ for (const auto &iter : tokens) {
+ ASSERT_FALSE(ns->Add(iter.second, "new_" + iter.first).IsOK());
+ }
+
+ for (const auto &iter : tokens) {
+ ASSERT_EQ(iter.first, ns->Get(iter.second).GetValue());
+ }
+
+ for (const auto &iter : tokens) {
+ ASSERT_TRUE(ns->Set(iter.second, "new_" + iter.first).IsOK());
+ }
+
+ auto list_tokens = ns->List();
+ ASSERT_EQ(list_tokens.size(), tokens.size());
+ for (const auto &iter : tokens) {
+ ASSERT_EQ(iter.second, list_tokens["new_" + iter.first]);
+ }
+
+ for (const auto &iter : tokens) {
+ ASSERT_TRUE(ns->Del(iter.second).IsOK());
+ }
+ ASSERT_EQ(0, ns->List().size());
+ }
+}
diff --git a/tests/cppunit/test_base.h b/tests/cppunit/test_base.h
index 4bfd0c32..043b9329 100644
--- a/tests/cppunit/test_base.h
+++ b/tests/cppunit/test_base.h
@@ -23,6 +23,7 @@
#include <gtest/gtest.h>
#include <filesystem>
+#include <fstream>
#include "storage/redis_db.h"
#include "types/redis_hash.h"
@@ -30,13 +31,19 @@
class TestBase : public testing::Test { // NOLINT
protected:
explicit TestBase() : config_(new Config()) {
+ const char *path = "test.conf";
+ unlink(path);
+ std::ofstream output_file(path, std::ios::out);
+ output_file << "";
+
+ auto s = config_->Load(CLIOptions(path));
config_->db_dir = "testdb";
config_->backup_dir = "testdb/backup";
config_->rocks_db.compression = rocksdb::CompressionType::kNoCompression;
config_->rocks_db.write_buffer_size = 1;
config_->rocks_db.block_size = 100;
storage_ = new engine::Storage(config_);
- Status s = storage_->Open();
+ s = storage_->Open();
if (!s.IsOK()) {
std::cout << "Failed to open the storage, encounter error: " << s.Msg()
<< std::endl;
assert(s.IsOK());
diff --git a/tests/gocase/unit/namespace/namespace_test.go
b/tests/gocase/unit/namespace/namespace_test.go
index 1312217c..bc04a7c3 100644
--- a/tests/gocase/unit/namespace/namespace_test.go
+++ b/tests/gocase/unit/namespace/namespace_test.go
@@ -56,8 +56,8 @@ func TestNamespace(t *testing.T) {
}
// duplicate add the same namespace
for ns, token := range nsTokens {
- r := rdb.Do(ctx, "NAMESPACE", "ADD", ns, token)
- util.ErrorRegexp(t, r.Err(), ".*ERR the token has
already exists.*")
+ r := rdb.Do(ctx, "NAMESPACE", "ADD", ns, "new"+token)
+ util.ErrorRegexp(t, r.Err(), ".*ERR the namespace
already exists.*")
}
for ns, token := range nsTokens {
r := rdb.Do(ctx, "NAMESPACE", "GET", ns)
@@ -71,6 +71,44 @@ func TestNamespace(t *testing.T) {
}
})
+ t.Run("Namespace exists after restart", func(t *testing.T) {
+ for _, enableNamespaceReplication := range []string{"no",
"yes"} {
+ require.NoError(t, rdb.ConfigSet(ctx,
+ "repl-namespace-enabled",
+ enableNamespaceReplication,
+ ).Err())
+ require.NoError(t, rdb.ConfigRewrite(ctx).Err())
+ nsTokens := map[string]string{
+ "n1": "t1",
+ "n2": "t2",
+ "n3": "t3",
+ "n4": "t4",
+ }
+ for ns, token := range nsTokens {
+ r := rdb.Do(ctx, "NAMESPACE", "ADD", ns, token)
+ require.NoError(t, r.Err())
+ require.Equal(t, "OK", r.Val())
+ }
+ for ns, token := range nsTokens {
+ r := rdb.Do(ctx, "NAMESPACE", "GET", ns)
+ require.NoError(t, r.Err())
+ require.Equal(t, token, r.Val())
+ }
+
+ srv.Restart()
+ for ns, token := range nsTokens {
+ r := rdb.Do(ctx, "NAMESPACE", "GET", ns)
+ require.NoError(t, r.Err())
+ require.Equal(t, token, r.Val())
+ }
+ for ns := range nsTokens {
+ r := rdb.Do(ctx, "NAMESPACE", "DEL", ns)
+ require.NoError(t, r.Err())
+ require.Equal(t, "OK", r.Val())
+ }
+ }
+ })
+
t.Run("Concurrent creating namespaces", func(t *testing.T) {
threads := 4
countPerThread := 10
@@ -101,3 +139,105 @@ func TestNamespace(t *testing.T) {
})
})
}
+
+func TestNamespaceReplicate(t *testing.T) {
+ password := "pwd"
+ masterSrv := util.StartServer(t, map[string]string{
+ "requirepass": password,
+ })
+ defer masterSrv.Close()
+ masterRdb := masterSrv.NewClientWithOption(&redis.Options{
+ Password: password,
+ })
+ defer func() { require.NoError(t, masterRdb.Close()) }()
+
+ slaveSrv := util.StartServer(t, map[string]string{
+ "masterauth": password,
+ "requirepass": password,
+ })
+ defer slaveSrv.Close()
+ slaveRdb := slaveSrv.NewClientWithOption(&redis.Options{
+ Password: password,
+ })
+ defer func() { require.NoError(t, slaveRdb.Close()) }()
+
+ util.SlaveOf(t, slaveRdb, masterSrv)
+ util.WaitForSync(t, slaveRdb)
+
+ ctx := context.Background()
+ nsTokens := map[string]string{
+ "n1": "t1",
+ "n2": "t2",
+ "n3": "t3",
+ "n4": "t4",
+ }
+
+ t.Run("Disable Replicate namespces", func(t *testing.T) {
+ require.NoError(t, masterRdb.ConfigSet(ctx,
"repl-namespace-enabled", "no").Err())
+ require.NoError(t, slaveRdb.ConfigSet(ctx,
"repl-namespace-enabled", "no").Err())
+
+ for ns, token := range nsTokens {
+ r := masterRdb.Do(ctx, "NAMESPACE", "ADD", ns, token)
+ require.NoError(t, r.Err())
+ require.Equal(t, "OK", r.Val())
+ }
+ util.WaitForOffsetSync(t, slaveRdb, masterRdb)
+
+ // Can read namespaces on master
+ for ns, token := range nsTokens {
+ r := masterRdb.Do(ctx, "NAMESPACE", "GET", ns)
+ require.NoError(t, r.Err())
+ require.Equal(t, token, r.Val())
+ }
+ // Can't read namespaces on slave
+ for ns := range nsTokens {
+ r := slaveRdb.Do(ctx, "NAMESPACE", "GET", ns)
+ require.EqualError(t, r.Err(), redis.Nil.Error())
+ }
+
+ for ns := range nsTokens {
+ r := masterRdb.Do(ctx, "NAMESPACE", "DEL", ns)
+ require.NoError(t, r.Err())
+ require.Equal(t, "OK", r.Val())
+ }
+ })
+
+ t.Run("Enable Replicate namespaces", func(t *testing.T) {
+ require.NoError(t, masterRdb.ConfigSet(ctx,
"repl-namespace-enabled", "yes").Err())
+ require.NoError(t, slaveRdb.ConfigSet(ctx,
"repl-namespace-enabled", "yes").Err())
+
+ for ns, token := range nsTokens {
+ r := masterRdb.Do(ctx, "NAMESPACE", "ADD", ns, token)
+ require.NoError(t, r.Err())
+ require.Equal(t, "OK", r.Val())
+ }
+ util.WaitForOffsetSync(t, slaveRdb, masterRdb)
+
+ for ns, token := range nsTokens {
+ r := slaveRdb.Do(ctx, "NAMESPACE", "GET", ns)
+ require.NoError(t, r.Err())
+ require.Equal(t, token, r.Val())
+ }
+
+ for ns := range nsTokens {
+ r := masterRdb.Do(ctx, "NAMESPACE", "DEL", ns)
+ require.NoError(t, r.Err())
+ require.Equal(t, "OK", r.Val())
+ }
+ util.WaitForOffsetSync(t, slaveRdb, masterRdb)
+
+ for ns := range nsTokens {
+ r := slaveRdb.Do(ctx, "NAMESPACE", "GET", ns)
+ require.EqualError(t, r.Err(), redis.Nil.Error())
+ }
+ })
+
+ t.Run("Don't allow to operate slave's namespace if replication is
enabled", func(t *testing.T) {
+ r := slaveRdb.Do(ctx, "NAMESPACE", "ADD", "ns_xxxx",
"token_xxxx")
+ util.ErrorRegexp(t, r.Err(), ".*ERR namespace is read-only for
slave.*")
+ })
+
+ t.Run("Turn off namespace replication is not allowed", func(t
*testing.T) {
+ util.ErrorRegexp(t, masterRdb.ConfigSet(ctx,
"repl-namespace-enabled", "no").Err(), ".*cannot switch off
repl_namespace_enabled when namespaces exist in db.*")
+ })
+}