This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new f9728192 Add the support of the namespace replication (#1776)
f9728192 is described below

commit f97281928f51188b0592e93110981c8a739bbe64
Author: hulk <[email protected]>
AuthorDate: Mon Oct 9 14:15:52 2023 +0800

    Add the support of the namespace replication (#1776)
    
    Currently, Kvrocks won't propagate the namespace and token between master 
and slaves, and it may cause trouble if users add them in master but missing in 
slaves. So it would make this process easier if we could propagate the 
namespace/token as well.
    
    
    ## Design
    
    To implement this feature, we need to consider the following situations:
    
    - How to replicate namespaces between the master and slaves
    - How to be compatible with old versions
    
    **For the replication:**
    
    We now have the propagate column family to propagate commands between the 
master and slaves, so we can use it to replicate namespaces as well. The steps 
are like below:
    
    1. Modify namespace/token in master(forbid changing in slaves)
    2. Propagate the namespace event to notify slaves like the Lua script
    3. Slaves replay namespace/token changes
    
    This works well for new instances, but what if there are different 
namespaces between the master and slaves in old instances?  There are three 
possible conditions:
    
    1. Namespace/token exists in master but NOT in slave
    2. Namespace exists in slave but NOT in master
    3. Namespace exists both in master and slave, but the token is different
    
    For condition 1, we can add the namespace/token to the slave as well. For 
condition 2, it makes sense to keep the namespace NOT existing in the master 
since data are from it, so we can remove the namespace/token. The most 
troublesome condition is 3 since users may have already used the token to 
access the data, changing the token would cause an invalid password error. **To 
make this simple, I prefer also overwriting the token.** And offer the 
configuration to allow users to enable or  [...]
    
    To be noticed, we don't expect to propagate the default namespace since it 
may affect the replication part, so we won't persist the default namespace.
    
    **For the compatible issue:**
    
    Currently, namespaces are kept in the configuration file, so we need to 
read from the rocksdb first(store in propagate column like Lua script) when 
starting:
    
    - If the key exists in rocksdb, load namespace/token from rocksdb and check 
if any namespace/token is inside the configuration file. If yes, refuse to 
start the server and ask users to remove them and add them via the `namespace 
add` command.
    - If the key does NOT exist in rocksdb, read namespace/token from the 
configuration file and write them into rocksdb. To be noticed, we need to add 
an empty value for the key even though no namespace/token.
    
    Users can only modify the namespace via command if this feature is enabled.
---
 kvrocks.conf                                  |   9 ++
 src/cluster/replication.cc                    |   5 +
 src/commands/cmd_server.cc                    |  35 ++---
 src/config/config.cc                          | 128 +++---------------
 src/config/config.h                           |  15 ++-
 src/server/namespace.cc                       | 182 ++++++++++++++++++++++++++
 src/server/namespace.h                        |  50 +++++++
 src/server/server.cc                          |  12 +-
 src/server/server.h                           |   5 +
 src/storage/storage.h                         |   2 +-
 tests/cppunit/config_test.cc                  | 142 +-------------------
 tests/cppunit/namespace_test.cc               |  71 ++++++++++
 tests/cppunit/test_base.h                     |   9 +-
 tests/gocase/unit/namespace/namespace_test.go | 144 +++++++++++++++++++-
 14 files changed, 527 insertions(+), 282 deletions(-)

diff --git a/kvrocks.conf b/kvrocks.conf
index d2b027cb..7089cec8 100644
--- a/kvrocks.conf
+++ b/kvrocks.conf
@@ -51,6 +51,15 @@ daemonize no
 
 cluster-enabled no
 
+# By default, namespaces are stored in the configuration file and won't be 
replicated
+# to replicas. This option allows to change this behavior, so that namespaces 
are also
+# propagated to slaves. Note that:
+# 1) it won't replicate the 'masterauth' to prevent breaking master/replica 
replication
+# 2) it will overwrite replica's namespace with master's namespace, so be 
careful of in-using namespaces
+# 3) cannot switch off the namespace replication once it's enabled
+#
+# Default: no
+repl-namespace-enabled no
 
 # Persist the cluster nodes topology in local file($dir/nodes.conf). This 
configuration
 # takes effect only if the cluster mode was enabled.
diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index a6110a86..c2d17b5a 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -975,6 +975,11 @@ Status ReplicationThread::parseWriteBatch(const 
std::string &batch_string) {
             return s.Prefixed("failed to execute propagate command");
           }
         }
+      } else if (write_batch_handler.Key() == kNamespaceDBKey) {
+        auto s = srv_->GetNamespace()->LoadAndRewrite();
+        if (!s.IsOK()) {
+          return s.Prefixed("failed to load namespaces");
+        }
       }
       break;
     case kBatchTypeStream: {
diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc
index 232b17c8..fe96074e 100644
--- a/src/commands/cmd_server.cc
+++ b/src/commands/cmd_server.cc
@@ -39,15 +39,15 @@ enum class AuthResult {
   NO_REQUIRE_PASS,
 };
 
-AuthResult AuthenticateUser(Connection *conn, Config *config, const 
std::string &user_password) {
-  auto iter = config->tokens.find(user_password);
-  if (iter != config->tokens.end()) {
-    conn->SetNamespace(iter->second);
+AuthResult AuthenticateUser(Server *srv, Connection *conn, const std::string 
&user_password) {
+  auto ns = srv->GetNamespace()->GetByToken(user_password);
+  if (ns.IsOK()) {
+    conn->SetNamespace(ns.GetValue());
     conn->BecomeUser();
     return AuthResult::OK;
   }
 
-  const auto &requirepass = config->requirepass;
+  const auto &requirepass = srv->GetConfig()->requirepass;
   if (!requirepass.empty() && user_password != requirepass) {
     return AuthResult::INVALID_PASSWORD;
   }
@@ -64,9 +64,8 @@ AuthResult AuthenticateUser(Connection *conn, Config *config, 
const std::string
 class CommandAuth : public Commander {
  public:
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
-    Config *config = svr->GetConfig();
     auto &user_password = args_[1];
-    AuthResult result = AuthenticateUser(conn, config, user_password);
+    AuthResult result = AuthenticateUser(svr, conn, user_password);
     switch (result) {
       case AuthResult::OK:
         *output = redis::SimpleString("OK");
@@ -89,10 +88,13 @@ class CommandNamespace : public Commander {
 
     Config *config = svr->GetConfig();
     std::string sub_command = util::ToLower(args_[1]);
+    if (config->repl_namespace_enabled && config->IsSlave() && sub_command != 
"get") {
+      return {Status::RedisExecErr, "namespace is read-only for slave"};
+    }
     if (args_.size() == 3 && sub_command == "get") {
       if (args_[2] == "*") {
         std::vector<std::string> namespaces;
-        auto tokens = config->tokens;
+        auto tokens = svr->GetNamespace()->List();
         for (auto &token : tokens) {
           namespaces.emplace_back(token.second);  // namespace
           namespaces.emplace_back(token.first);   // token
@@ -101,26 +103,25 @@ class CommandNamespace : public Commander {
         namespaces.emplace_back(config->requirepass);
         *output = redis::MultiBulkString(namespaces, false);
       } else {
-        std::string token;
-        auto s = config->GetNamespace(args_[2], &token);
-        if (s.Is<Status::NotFound>()) {
+        auto token = svr->GetNamespace()->Get(args_[2]);
+        if (token.Is<Status::NotFound>()) {
           *output = redis::NilString();
         } else {
-          *output = redis::BulkString(token);
+          *output = redis::BulkString(token.GetValue());
         }
       }
     } else if (args_.size() == 4 && sub_command == "set") {
-      Status s = config->SetNamespace(args_[2], args_[3]);
+      Status s = svr->GetNamespace()->Set(args_[2], args_[3]);
       *output = s.IsOK() ? redis::SimpleString("OK") : redis::Error("ERR " + 
s.Msg());
       LOG(WARNING) << "Updated namespace: " << args_[2] << " with token: " << 
args_[3] << ", addr: " << conn->GetAddr()
                    << ", result: " << s.Msg();
     } else if (args_.size() == 4 && sub_command == "add") {
-      Status s = config->AddNamespace(args_[2], args_[3]);
+      Status s = svr->GetNamespace()->Add(args_[2], args_[3]);
       *output = s.IsOK() ? redis::SimpleString("OK") : redis::Error("ERR " + 
s.Msg());
       LOG(WARNING) << "New namespace: " << args_[2] << " with token: " << 
args_[3] << ", addr: " << conn->GetAddr()
                    << ", result: " << s.Msg();
     } else if (args_.size() == 3 && sub_command == "del") {
-      Status s = config->DelNamespace(args_[2]);
+      Status s = svr->GetNamespace()->Del(args_[2]);
       *output = s.IsOK() ? redis::SimpleString("OK") : redis::Error("ERR " + 
s.Msg());
       LOG(WARNING) << "Deleted namespace: " << args_[2] << ", addr: " << 
conn->GetAddr() << ", result: " << s.Msg();
     } else {
@@ -239,7 +240,7 @@ class CommandConfig : public Commander {
     }
 
     if (args_.size() == 2 && sub_command == "rewrite") {
-      Status s = config->Rewrite();
+      Status s = config->Rewrite(svr->GetNamespace()->List());
       if (!s.IsOK()) return {Status::RedisExecErr, s.Msg()};
 
       *output = redis::SimpleString("OK");
@@ -709,7 +710,7 @@ class CommandHello final : public Commander {
           next_arg++;
         }
         const auto &user_password = args_[next_arg + 1];
-        auto auth_result = AuthenticateUser(conn, svr->GetConfig(), 
user_password);
+        auto auth_result = AuthenticateUser(svr, conn, user_password);
         switch (auth_result) {
           case AuthResult::INVALID_PASSWORD:
             return {Status::NotOK, "invalid password"};
diff --git a/src/config/config.cc b/src/config/config.cc
index e12edc38..ad922dd6 100644
--- a/src/config/config.cc
+++ b/src/config/config.cc
@@ -159,6 +159,7 @@ Config::Config() {
       {"log-retention-days", false, new IntField(&log_retention_days, -1, -1, 
INT_MAX)},
       {"persist-cluster-nodes-enabled", false, new 
YesNoField(&persist_cluster_nodes_enabled, true)},
       {"redis-cursor-compatible", false, new 
YesNoField(&redis_cursor_compatible, false)},
+      {"repl-namespace-enabled", false, new 
YesNoField(&repl_namespace_enabled, false)},
 
       /* rocksdb options */
       {"rocksdb.compression", false,
@@ -233,17 +234,17 @@ void Config::initFieldValidator() {
   std::map<std::string, ValidateFn> validators = {
       {"requirepass",
        [this](const std::string &k, const std::string &v) -> Status {
-         if (v.empty() && !tokens.empty()) {
+         if (v.empty() && !load_tokens.empty()) {
            return {Status::NotOK, "requirepass empty not allowed while the 
namespace exists"};
          }
-         if (tokens.find(v) != tokens.end()) {
+         if (load_tokens.find(v) != load_tokens.end()) {
            return {Status::NotOK, "requirepass is duplicated with namespace 
tokens"};
          }
          return Status::OK();
        }},
       {"masterauth",
        [this](const std::string &k, const std::string &v) -> Status {
-         if (tokens.find(v) != tokens.end()) {
+         if (load_tokens.find(v) != load_tokens.end()) {
            return {Status::NotOK, "masterauth is duplicated with namespace 
tokens"};
          }
          return Status::OK();
@@ -515,6 +516,12 @@ void Config::initFieldCallback() {
          remove(nodes_file_path.data());
          return Status::OK();
        }},
+      {"repl-namespace-enabled",
+       [](Server *srv, const std::string &k, const std::string &v) -> Status {
+         if (!srv) return Status::OK();
+         return srv->GetNamespace()->LoadAndRewrite();
+       }},
+
       {"rocksdb.target_file_size_base",
        [this](Server *srv, const std::string &k, const std::string &v) -> 
Status {
          if (!srv) return Status::OK();
@@ -682,7 +689,7 @@ Status Config::parseConfigFromPair(const 
std::pair<std::string, std::string> &in
   if (strncasecmp(input.first.data(), ns_str, ns_str_size) == 0) {
     // namespace should keep key case-sensitive
     field_key = input.first;
-    tokens[input.second] = input.first.substr(ns_str_size);
+    load_tokens[input.second] = input.first.substr(ns_str_size);
     return Status::OK();
   }
 
@@ -711,10 +718,10 @@ Status Config::parseConfigFromString(const std::string 
&input, int line_number)
 }
 
 Status Config::finish() {
-  if (requirepass.empty() && !tokens.empty()) {
+  if (requirepass.empty() && !load_tokens.empty()) {
     return {Status::NotOK, "requirepass empty wasn't allowed while the 
namespace exists"};
   }
-  if ((cluster_enabled) && !tokens.empty()) {
+  if ((cluster_enabled) && !load_tokens.empty()) {
     return {Status::NotOK, "enabled cluster mode wasn't allowed while the 
namespace exists"};
   }
   if (unixsocket.empty() && binds.size() == 0) {
@@ -836,7 +843,7 @@ Status Config::Set(Server *svr, std::string key, const 
std::string &value) {
   return Status::OK();
 }
 
-Status Config::Rewrite() {
+Status Config::Rewrite(const std::map<std::string, std::string> &tokens) {
   if (path_.empty()) {
     return {Status::NotOK, "the server is running without a config file"};
   }
@@ -853,8 +860,10 @@ Status Config::Rewrite() {
   }
 
   std::string namespace_prefix = "namespace.";
-  for (const auto &iter : tokens) {
-    new_config[namespace_prefix + iter.second] = iter.first;
+  if (!repl_namespace_enabled) {  // need to rewrite to the configuration if 
we don't replicate namespaces
+    for (const auto &iter : tokens) {
+      new_config[namespace_prefix + iter.second] = iter.first;
+    }
   }
 
   std::ifstream file(path_);
@@ -900,104 +909,3 @@ Status Config::Rewrite() {
   }
   return Status::OK();
 }
-
-Status Config::GetNamespace(const std::string &ns, std::string *token) const {
-  token->clear();
-  for (const auto &iter : tokens) {
-    if (iter.second == ns) {
-      *token = iter.first;
-      return Status::OK();
-    }
-  }
-  return {Status::NotFound};
-}
-
-Status Config::SetNamespace(const std::string &ns, const std::string &token) {
-  if (ns == kDefaultNamespace) {
-    return {Status::NotOK, "forbidden to update the default namespace"};
-  }
-  if (tokens.find(token) != tokens.end()) {
-    return {Status::NotOK, "the token has already exists"};
-  }
-
-  if (token == requirepass || token == masterauth) {
-    return {Status::NotOK, "the token is duplicated with requirepass or 
masterauth"};
-  }
-
-  for (const auto &iter : tokens) {
-    if (iter.second == ns) {
-      tokens.erase(iter.first);
-      tokens[token] = ns;
-      auto s = Rewrite();
-      if (!s.IsOK()) {
-        // Need to roll back the old token if fails to rewrite the config
-        tokens.erase(token);
-        tokens[iter.first] = ns;
-      }
-      return s;
-    }
-  }
-  return {Status::NotOK, "the namespace was not found"};
-}
-
-Status Config::AddNamespace(const std::string &ns, const std::string &token) {
-  if (requirepass.empty()) {
-    return {Status::NotOK, "forbidden to add namespace when requirepass was 
empty"};
-  }
-  if (cluster_enabled) {
-    return {Status::NotOK, "forbidden to add namespace when cluster mode was 
enabled"};
-  }
-  if (ns == kDefaultNamespace) {
-    return {Status::NotOK, "forbidden to add the default namespace"};
-  }
-  auto s = isNamespaceLegal(ns);
-  if (!s.IsOK()) return s;
-  if (tokens.find(token) != tokens.end()) {
-    return {Status::NotOK, "the token has already exists"};
-  }
-
-  if (token == requirepass || token == masterauth) {
-    return {Status::NotOK, "the token is duplicated with requirepass or 
masterauth"};
-  }
-
-  for (const auto &iter : tokens) {
-    if (iter.second == ns) {
-      return {Status::NotOK, "the namespace has already exists"};
-    }
-  }
-  tokens[token] = ns;
-
-  s = Rewrite();
-  if (!s.IsOK()) {
-    tokens.erase(token);
-  }
-  return s;
-}
-
-Status Config::DelNamespace(const std::string &ns) {
-  if (ns == kDefaultNamespace) {
-    return {Status::NotOK, "forbidden to delete the default namespace"};
-  }
-  for (const auto &iter : tokens) {
-    if (iter.second == ns) {
-      tokens.erase(iter.first);
-      auto s = Rewrite();
-      if (!s.IsOK()) {
-        tokens[iter.first] = ns;
-      }
-      return s;
-    }
-  }
-  return {Status::NotOK, "the namespace was not found"};
-}
-
-Status Config::isNamespaceLegal(const std::string &ns) {
-  if (ns.size() > UINT8_MAX) {
-    return {Status::NotOK, fmt::format("size exceed limit {}", UINT8_MAX)};
-  }
-  char last_char = ns.back();
-  if (last_char == std::numeric_limits<char>::max()) {
-    return {Status::NotOK, "namespace contain illegal letter"};
-  }
-  return Status::OK();
-}
diff --git a/src/config/config.h b/src/config/config.h
index b93496b7..d79d1527 100644
--- a/src/config/config.h
+++ b/src/config/config.h
@@ -136,7 +136,7 @@ struct Config {
   CompactionCheckerRange compaction_checker_range{-1, -1};
   int64_t force_compact_file_age;
   int force_compact_file_min_deleted_percentage;
-  std::map<std::string, std::string> tokens;
+  bool repl_namespace_enabled = false;
   std::string replica_announce_ip;
   uint32_t replica_announce_port = 0;
 
@@ -149,6 +149,11 @@ struct Config {
 
   bool redis_cursor_compatible = false;
   int log_retention_days;
+
+  // load_tokens is used to buffer the tokens when loading,
+  // don't use it to authenticate or rewrite the configuration file.
+  std::map<std::string, std::string> load_tokens;
+
   // profiling
   int profiling_sample_ratio = 0;
   int profiling_sample_record_threshold_ms = 0;
@@ -210,16 +215,13 @@ struct Config {
   mutable std::mutex backup_mu;
 
   std::string NodesFilePath() const;
-  Status Rewrite();
+  Status Rewrite(const std::map<std::string, std::string> &tokens);
   Status Load(const CLIOptions &path);
   void Get(const std::string &key, std::vector<std::string> *values) const;
   Status Set(Server *svr, std::string key, const std::string &value);
   void SetMaster(const std::string &host, uint32_t port);
   void ClearMaster();
-  Status GetNamespace(const std::string &ns, std::string *token) const;
-  Status AddNamespace(const std::string &ns, const std::string &token);
-  Status SetNamespace(const std::string &ns, const std::string &token);
-  Status DelNamespace(const std::string &ns);
+  bool IsSlave() const { return !master_host.empty(); }
 
  private:
   std::string path_;
@@ -237,5 +239,4 @@ struct Config {
   Status parseConfigFromPair(const std::pair<std::string, std::string> &input, 
int line_number);
   Status parseConfigFromString(const std::string &input, int line_number);
   Status finish();
-  static Status isNamespaceLegal(const std::string &ns);
 };
diff --git a/src/server/namespace.cc b/src/server/namespace.cc
new file mode 100644
index 00000000..a0e8d05a
--- /dev/null
+++ b/src/server/namespace.cc
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "namespace.h"
+
+#include "jsoncons/json.hpp"
+
+// Error messages
+constexpr const char* kErrNamespaceExists = "the namespace already exists";
+constexpr const char* kErrTokenExists = "the token already exists";
+constexpr const char* kErrNamespaceNotFound = "the namespace was not found";
+constexpr const char* kErrRequiredPassEmpty = "forbidden to add namespace when 
requirepass was empty";
+constexpr const char* kErrClusterModeEnabled = "forbidden to add namespace 
when cluster mode was enabled";
+constexpr const char* kErrDeleteDefaultNamespace = "forbidden to delete the 
default namespace";
+constexpr const char* kErrAddDefaultNamespace = "forbidden to add the default 
namespace";
+constexpr const char* kErrInvalidToken = "the token is duplicated with 
requirepass or masterauth";
+
+Status IsNamespaceLegal(const std::string& ns) {
+  if (ns.size() > UINT8_MAX) {
+    return {Status::NotOK, fmt::format("size exceed limit {}", UINT8_MAX)};
+  }
+  char last_char = ns.back();
+  if (last_char == std::numeric_limits<char>::max()) {
+    return {Status::NotOK, "namespace contain illegal letter"};
+  }
+  return Status::OK();
+}
+
+Status Namespace::LoadAndRewrite() {
+  auto config = storage_->GetConfig();
+  // Load from the configuration file first
+  tokens_ = config->load_tokens;
+
+  // We would like to load namespaces from db even if repl_namespace_enabled 
is false,
+  // this can avoid missing some namespaces when turn on/off 
repl_namespace_enabled.
+  std::string value;
+  auto s = storage_->Get(rocksdb::ReadOptions(), cf_, kNamespaceDBKey, &value);
+  if (!s.ok() && !s.IsNotFound()) {
+    return {Status::NotOK, s.ToString()};
+  }
+  if (s.ok()) {
+    // The namespace db key is existed, so it doesn't allow to switch off 
repl_namespace_enabled
+    if (!config->repl_namespace_enabled) {
+      return {Status::NotOK, "cannot switch off repl_namespace_enabled when 
namespaces exist in db"};
+    }
+
+    jsoncons::json j = jsoncons::json::parse(value);
+    for (const auto& iter : j.object_range()) {
+      if (tokens_.find(iter.key()) == tokens_.end()) {
+        // merge the namespace from db
+        tokens_[iter.key()] = iter.value().as<std::string>();
+      }
+    }
+  }
+
+  return Rewrite();
+}
+
+StatusOr<std::string> Namespace::Get(const std::string& ns) const {
+  for (const auto& iter : tokens_) {
+    if (iter.second == ns) {
+      return iter.first;
+    }
+  }
+  return {Status::NotFound};
+}
+
+StatusOr<std::string> Namespace::GetByToken(const std::string& token) const {
+  auto iter = tokens_.find(token);
+  if (iter == tokens_.end()) {
+    return {Status::NotFound};
+  }
+  return iter->second;
+}
+
+Status Namespace::Set(const std::string& ns, const std::string& token) {
+  auto s = IsNamespaceLegal(ns);
+  if (!s.IsOK()) return s;
+  auto config = storage_->GetConfig();
+  if (config->requirepass.empty()) {
+    return {Status::NotOK, kErrRequiredPassEmpty};
+  }
+  if (config->cluster_enabled) {
+    return {Status::NotOK, kErrClusterModeEnabled};
+  }
+  if (ns == kDefaultNamespace) {
+    return {Status::NotOK, kErrAddDefaultNamespace};
+  }
+  if (token == config->requirepass || token == config->masterauth) {
+    return {Status::NotOK, kErrInvalidToken};
+  }
+
+  for (const auto& iter : tokens_) {
+    if (iter.second == ns) {  // need to delete the old token first
+      tokens_.erase(iter.first);
+      break;
+    }
+  }
+  tokens_[token] = ns;
+
+  s = Rewrite();
+  if (!s.IsOK()) {
+    tokens_.erase(token);
+    return s;
+  }
+  return Status::OK();
+}
+
+Status Namespace::Add(const std::string& ns, const std::string& token) {
+  // duplicate namespace
+  for (const auto& iter : tokens_) {
+    if (iter.second == ns) {
+      if (iter.first == token) return Status::OK();
+      return {Status::NotOK, kErrNamespaceExists};
+    }
+  }
+  // duplicate token
+  if (tokens_.find(token) != tokens_.end()) {
+    return {Status::NotOK, kErrTokenExists};
+  }
+  return Set(ns, token);
+}
+
+Status Namespace::Del(const std::string& ns) {
+  if (ns == kDefaultNamespace) {
+    return {Status::NotOK, kErrDeleteDefaultNamespace};
+  }
+
+  for (const auto& iter : tokens_) {
+    if (iter.second == ns) {
+      tokens_.erase(iter.first);
+      auto s = Rewrite();
+      if (!s.IsOK()) {
+        tokens_[iter.first] = iter.second;
+        return s;
+      }
+      return Status::OK();
+    }
+  }
+  return {Status::NotOK, kErrNamespaceNotFound};
+}
+
+Status Namespace::Rewrite() {
+  auto config = storage_->GetConfig();
+  auto s = config->Rewrite(tokens_);
+  if (!s.IsOK()) {
+    return s;
+  }
+
+  // Don't propagate write to DB if its role is slave to prevent from
+  // increasing the DB sequence number.
+  if (config->IsSlave()) {
+    return Status::OK();
+  }
+
+  // Don't need to write to db if repl_namespace_enabled is false
+  if (!config->repl_namespace_enabled) {
+    return Status::OK();
+  }
+  jsoncons::json json;
+  for (const auto& iter : tokens_) {
+    json[iter.first] = iter.second;
+  }
+  return storage_->WriteToPropagateCF(kNamespaceDBKey, json.to_string());
+}
diff --git a/src/server/namespace.h b/src/server/namespace.h
new file mode 100644
index 00000000..cccf46e0
--- /dev/null
+++ b/src/server/namespace.h
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include "storage/storage.h"
+
+constexpr const char *kNamespaceDBKey = "__namespace_keys__";
+
+class Namespace {
+ public:
+  explicit Namespace(engine::Storage *storage) : storage_(storage) {
+    cf_ = storage_->GetCFHandle(engine::kPropagateColumnFamilyName);
+  }
+
+  ~Namespace() = default;
+  Namespace(const Namespace &) = delete;
+  Namespace &operator=(const Namespace &) = delete;
+
+  Status LoadAndRewrite();
+  StatusOr<std::string> Get(const std::string &ns) const;
+  StatusOr<std::string> GetByToken(const std::string &token) const;
+  Status Set(const std::string &ns, const std::string &token);
+  Status Add(const std::string &ns, const std::string &token);
+  Status Del(const std::string &ns);
+  const std::map<std::string, std::string> &List() const { return tokens_; }
+  Status Rewrite();
+
+ private:
+  engine::Storage *storage_;
+  rocksdb::ColumnFamilyHandle *cf_ = nullptr;
+  std::map<std::string, std::string> tokens_;
+};
diff --git a/src/server/server.cc b/src/server/server.cc
index a023182c..412e7967 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -54,7 +54,7 @@
 constexpr const char *REDIS_VERSION = "4.0.0";
 
 Server::Server(engine::Storage *storage, Config *config)
-    : storage(storage), start_time_(util::GetTimeStamp()), config_(config) {
+    : storage(storage), start_time_(util::GetTimeStamp()), config_(config), 
namespace_(storage) {
   // init commands stats here to prevent concurrent insert, and cause core
   auto commands = redis::GetOriginalCommands();
   for (const auto &iter : *commands) {
@@ -132,12 +132,16 @@ Server::~Server() {
 //     - feed-replica-data-info: generate checkpoint and send files list when 
full sync
 //     - feed-replica-file: send SST files when slaves ask for full sync
 Status Server::Start() {
+  auto s = namespace_.LoadAndRewrite();
+  if (!s.IsOK()) {
+    return s;
+  }
   if (!config_->master_host.empty()) {
-    Status s = AddMaster(config_->master_host, 
static_cast<uint32_t>(config_->master_port), false);
+    s = AddMaster(config_->master_host, 
static_cast<uint32_t>(config_->master_port), false);
     if (!s.IsOK()) return s;
   } else {
     // Generate new replication id if not a replica
-    auto s = storage->ShiftReplId();
+    s = storage->ShiftReplId();
     if (!s.IsOK()) {
       return s.Prefixed("failed to shift replication id");
     }
@@ -1394,7 +1398,7 @@ Status Server::autoResizeBlockAndSST() {
     config_->rocks_db.block_size = block_size;
   }
 
-  auto s = config_->Rewrite();
+  auto s = config_->Rewrite(namespace_.List());
   LOG(INFO) << "[server] Rewrite config, result: " << s.Msg();
 
   return Status::OK();
diff --git a/src/server/server.h b/src/server/server.h
index 9098d3f1..b6ac94ec 100644
--- a/src/server/server.h
+++ b/src/server/server.h
@@ -42,6 +42,7 @@
 #include "cluster/slot_migrate.h"
 #include "commands/commander.h"
 #include "lua.hpp"
+#include "namespace.h"
 #include "server/redis_connection.h"
 #include "stats/log_collector.h"
 #include "stats/stats.h"
@@ -287,6 +288,7 @@ class Server {
   static bool IsWatchedKeysModified(redis::Connection *conn);
   void ResetWatchedKeys(redis::Connection *conn);
   std::list<std::pair<std::string, uint32_t>> GetSlaveHostAndPort();
+  Namespace *GetNamespace() { return &namespace_; }
 
 #ifdef ENABLE_OPENSSL
   UniqueSSLContext ssl_ctx;
@@ -325,6 +327,9 @@ class Server {
   std::list<std::unique_ptr<FeedSlaveThread>> slave_threads_;
   std::atomic<int> fetch_file_threads_num_ = 0;
 
+  // namespace
+  Namespace namespace_;
+
   // Some jobs to operate DB should be unique
   std::mutex db_job_mu_;
   bool db_compacting_ = false;
diff --git a/src/storage/storage.h b/src/storage/storage.h
index 7c380793..902f7e76 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -145,7 +145,7 @@ class Storage {
   uint64_t GetCompactionCount() const { return compaction_count_; }
   void IncrCompactionCount(uint64_t n) { compaction_count_.fetch_add(n); }
   bool IsSlotIdEncoded() const { return config_->slot_id_encoded; }
-  const Config *GetConfig() const { return config_; }
+  Config *GetConfig() const { return config_; }
 
   Status BeginTxn();
   Status CommitTxn();
diff --git a/tests/cppunit/config_test.cc b/tests/cppunit/config_test.cc
index 94c51eb9..d62dc615 100644
--- a/tests/cppunit/config_test.cc
+++ b/tests/cppunit/config_test.cc
@@ -91,7 +91,7 @@ TEST(Config, GetAndSet) {
     EXPECT_EQ(values[0], iter.first);
     EXPECT_EQ(values[1], iter.second);
   }
-  ASSERT_TRUE(config.Rewrite().IsOK());
+  ASSERT_TRUE(config.Rewrite({}).IsOK());
   s = config.Load(CLIOptions(path));
   EXPECT_TRUE(s.IsOK());
   for (const auto &iter : mutable_cases) {
@@ -174,7 +174,7 @@ TEST(Config, Rewrite) {
   redis::ResetCommands();
   Config config;
   ASSERT_TRUE(config.Load(CLIOptions(path)).IsOK());
-  ASSERT_TRUE(config.Rewrite().IsOK());
+  ASSERT_TRUE(config.Rewrite({}).IsOK());
   // Need to re-populate the command table since it has renamed by the previous
   redis::ResetCommands();
   Config new_config;
@@ -182,144 +182,6 @@ TEST(Config, Rewrite) {
   unlink(path);
 }
 
-TEST(Namespace, Add) {
-  const char *path = "test.conf";
-  unlink(path);
-
-  Config config;
-  auto s = config.Load(CLIOptions(path));
-  EXPECT_FALSE(s.IsOK());
-  config.slot_id_encoded = false;
-  EXPECT_TRUE(!config.AddNamespace("ns", "t0").IsOK());
-  config.requirepass = "foobared";
-
-  std::vector<std::string> namespaces = {"n1", "n2", "n3", "n4"};
-  std::vector<std::string> tokens = {"t1", "t2", "t3", "t4"};
-  for (size_t i = 0; i < namespaces.size(); i++) {
-    EXPECT_TRUE(config.AddNamespace(namespaces[i], tokens[i]).IsOK());
-  }
-  for (size_t i = 0; i < namespaces.size(); i++) {
-    std::string token;
-    s = config.GetNamespace(namespaces[i], &token);
-    EXPECT_TRUE(s.IsOK());
-    EXPECT_EQ(token, tokens[i]);
-  }
-  for (size_t i = 0; i < namespaces.size(); i++) {
-    s = config.AddNamespace(namespaces[i], tokens[i]);
-    EXPECT_FALSE(s.IsOK());
-    EXPECT_EQ(s.Msg(), "the token has already exists");
-  }
-  s = config.AddNamespace("n1", "t0");
-  EXPECT_FALSE(s.IsOK());
-  EXPECT_EQ(s.Msg(), "the namespace has already exists");
-
-  s = config.AddNamespace(kDefaultNamespace, "mytoken");
-  EXPECT_FALSE(s.IsOK());
-  EXPECT_EQ(s.Msg(), "forbidden to add the default namespace");
-  unlink(path);
-}
-
-TEST(Namespace, Set) {
-  const char *path = "test.conf";
-  unlink(path);
-
-  Config config;
-  auto s = config.Load(CLIOptions(path));
-  EXPECT_FALSE(s.IsOK());
-  config.slot_id_encoded = false;
-  config.requirepass = "foobared";
-  std::vector<std::string> namespaces = {"n1", "n2", "n3", "n4"};
-  std::vector<std::string> tokens = {"t1", "t2", "t3", "t4"};
-  std::vector<std::string> new_tokens = {"nt1", "nt2'", "nt3", "nt4"};
-  for (size_t i = 0; i < namespaces.size(); i++) {
-    s = config.SetNamespace(namespaces[i], tokens[i]);
-    EXPECT_FALSE(s.IsOK());
-    EXPECT_EQ(s.Msg(), "the namespace was not found");
-  }
-  for (size_t i = 0; i < namespaces.size(); i++) {
-    EXPECT_TRUE(config.AddNamespace(namespaces[i], tokens[i]).IsOK());
-  }
-  for (size_t i = 0; i < namespaces.size(); i++) {
-    std::string token;
-    s = config.GetNamespace(namespaces[i], &token);
-    EXPECT_TRUE(s.IsOK());
-    EXPECT_EQ(token, tokens[i]);
-  }
-  for (size_t i = 0; i < namespaces.size(); i++) {
-    EXPECT_TRUE(config.SetNamespace(namespaces[i], new_tokens[i]).IsOK());
-  }
-  for (size_t i = 0; i < namespaces.size(); i++) {
-    std::string token;
-    s = config.GetNamespace(namespaces[i], &token);
-    EXPECT_TRUE(s.IsOK());
-    EXPECT_EQ(token, new_tokens[i]);
-  }
-  unlink(path);
-}
-
-TEST(Namespace, Delete) {
-  const char *path = "test.conf";
-  unlink(path);
-
-  Config config;
-  auto s = config.Load(CLIOptions(path));
-  EXPECT_FALSE(s.IsOK());
-  config.slot_id_encoded = false;
-  config.requirepass = "foobared";
-  std::vector<std::string> namespaces = {"n1", "n2", "n3", "n4"};
-  std::vector<std::string> tokens = {"t1", "t2", "t3", "t4"};
-  for (size_t i = 0; i < namespaces.size(); i++) {
-    EXPECT_TRUE(config.AddNamespace(namespaces[i], tokens[i]).IsOK());
-  }
-  for (size_t i = 0; i < namespaces.size(); i++) {
-    std::string token;
-    s = config.GetNamespace(namespaces[i], &token);
-    EXPECT_TRUE(s.IsOK());
-    EXPECT_EQ(token, tokens[i]);
-  }
-  for (const auto &ns : namespaces) {
-    s = config.DelNamespace(ns);
-    EXPECT_TRUE(s.IsOK());
-    std::string token;
-    s = config.GetNamespace(ns, &token);
-    EXPECT_FALSE(s.IsOK());
-    EXPECT_TRUE(token.empty());
-  }
-  unlink(path);
-}
-
-TEST(Namespace, RewriteNamespaces) {
-  const char *path = "test.conf";
-  unlink(path);
-  Config config;
-  auto s = config.Load(CLIOptions(path));
-  EXPECT_FALSE(s.IsOK());
-  config.requirepass = "test";
-  config.backup_dir = "test";
-  config.slot_id_encoded = false;
-  std::vector<std::string> namespaces = {"n1", "n2", "n3", "n4"};
-  std::vector<std::string> tokens = {"t1", "t2", "t3", "t4"};
-  for (size_t i = 0; i < namespaces.size(); i++) {
-    EXPECT_TRUE(config.AddNamespace(namespaces[i], tokens[i]).IsOK());
-  }
-  EXPECT_TRUE(config.AddNamespace("to-be-deleted-ns", 
"to-be-deleted-token").IsOK());
-  EXPECT_TRUE(config.DelNamespace("to-be-deleted-ns").IsOK());
-
-  Config new_config;
-  s = new_config.Load(CLIOptions(path));
-  EXPECT_TRUE(s.IsOK());
-  for (size_t i = 0; i < namespaces.size(); i++) {
-    std::string token;
-    s = new_config.GetNamespace(namespaces[i], &token);
-    EXPECT_TRUE(s.IsOK());
-    EXPECT_EQ(token, tokens[i]);
-  }
-
-  std::string token;
-  EXPECT_FALSE(new_config.GetNamespace("to-be-deleted-ns", &token).IsOK());
-  unlink(path);
-}
-
 TEST(Config, ParseConfigLine) {
   ASSERT_EQ(*ParseConfigLine(""), ConfigKV{});
   ASSERT_EQ(*ParseConfigLine("# hello"), ConfigKV{});
diff --git a/tests/cppunit/namespace_test.cc b/tests/cppunit/namespace_test.cc
new file mode 100644
index 00000000..7d26a18e
--- /dev/null
+++ b/tests/cppunit/namespace_test.cc
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "server/namespace.h"
+
+#include <gtest/gtest.h>
+
+#include <fstream>
+
+#include "config/config.h"
+#include "test_base.h"
+
+class NamespaceTest : public TestBase {
+ protected:
+  explicit NamespaceTest() { config_->requirepass = "123"; }
+
+  void SetUp() override {}
+  void TearDown() override {}
+};
+
+TEST_F(NamespaceTest, AddAndDelete) {
+  for (const auto &v : {true, false}) {
+    auto ns = std::make_unique<Namespace>(storage_);
+    std::map<std::string, std::string> tokens = {
+        {"tokens2", "test_ns"}, {"tokens", "test_ns2"}, {"tokens3", 
"test_ns3"}};
+    config_->repl_namespace_enabled = v;
+    for (const auto &iter : tokens) {
+      ASSERT_TRUE(ns->Add(iter.second, iter.first).IsOK());
+    }
+
+    // test add duplicate namespace
+    for (const auto &iter : tokens) {
+      ASSERT_FALSE(ns->Add(iter.second, "new_" + iter.first).IsOK());
+    }
+
+    for (const auto &iter : tokens) {
+      ASSERT_EQ(iter.first, ns->Get(iter.second).GetValue());
+    }
+
+    for (const auto &iter : tokens) {
+      ASSERT_TRUE(ns->Set(iter.second, "new_" + iter.first).IsOK());
+    }
+
+    auto list_tokens = ns->List();
+    ASSERT_EQ(list_tokens.size(), tokens.size());
+    for (const auto &iter : tokens) {
+      ASSERT_EQ(iter.second, list_tokens["new_" + iter.first]);
+    }
+
+    for (const auto &iter : tokens) {
+      ASSERT_TRUE(ns->Del(iter.second).IsOK());
+    }
+    ASSERT_EQ(0, ns->List().size());
+  }
+}
diff --git a/tests/cppunit/test_base.h b/tests/cppunit/test_base.h
index 4bfd0c32..043b9329 100644
--- a/tests/cppunit/test_base.h
+++ b/tests/cppunit/test_base.h
@@ -23,6 +23,7 @@
 #include <gtest/gtest.h>
 
 #include <filesystem>
+#include <fstream>
 
 #include "storage/redis_db.h"
 #include "types/redis_hash.h"
@@ -30,13 +31,19 @@
 class TestBase : public testing::Test {  // NOLINT
  protected:
   explicit TestBase() : config_(new Config()) {
+    const char *path = "test.conf";
+    unlink(path);
+    std::ofstream output_file(path, std::ios::out);
+    output_file << "";
+
+    auto s = config_->Load(CLIOptions(path));
     config_->db_dir = "testdb";
     config_->backup_dir = "testdb/backup";
     config_->rocks_db.compression = rocksdb::CompressionType::kNoCompression;
     config_->rocks_db.write_buffer_size = 1;
     config_->rocks_db.block_size = 100;
     storage_ = new engine::Storage(config_);
-    Status s = storage_->Open();
+    s = storage_->Open();
     if (!s.IsOK()) {
       std::cout << "Failed to open the storage, encounter error: " << s.Msg() 
<< std::endl;
       assert(s.IsOK());
diff --git a/tests/gocase/unit/namespace/namespace_test.go 
b/tests/gocase/unit/namespace/namespace_test.go
index 1312217c..bc04a7c3 100644
--- a/tests/gocase/unit/namespace/namespace_test.go
+++ b/tests/gocase/unit/namespace/namespace_test.go
@@ -56,8 +56,8 @@ func TestNamespace(t *testing.T) {
                }
                // duplicate add the same namespace
                for ns, token := range nsTokens {
-                       r := rdb.Do(ctx, "NAMESPACE", "ADD", ns, token)
-                       util.ErrorRegexp(t, r.Err(), ".*ERR the token has 
already exists.*")
+                       r := rdb.Do(ctx, "NAMESPACE", "ADD", ns, "new"+token)
+                       util.ErrorRegexp(t, r.Err(), ".*ERR the namespace 
already exists.*")
                }
                for ns, token := range nsTokens {
                        r := rdb.Do(ctx, "NAMESPACE", "GET", ns)
@@ -71,6 +71,44 @@ func TestNamespace(t *testing.T) {
                }
        })
 
+       t.Run("Namespace exists after restart", func(t *testing.T) {
+               for _, enableNamespaceReplication := range []string{"no", 
"yes"} {
+                       require.NoError(t, rdb.ConfigSet(ctx,
+                               "repl-namespace-enabled",
+                               enableNamespaceReplication,
+                       ).Err())
+                       require.NoError(t, rdb.ConfigRewrite(ctx).Err())
+                       nsTokens := map[string]string{
+                               "n1": "t1",
+                               "n2": "t2",
+                               "n3": "t3",
+                               "n4": "t4",
+                       }
+                       for ns, token := range nsTokens {
+                               r := rdb.Do(ctx, "NAMESPACE", "ADD", ns, token)
+                               require.NoError(t, r.Err())
+                               require.Equal(t, "OK", r.Val())
+                       }
+                       for ns, token := range nsTokens {
+                               r := rdb.Do(ctx, "NAMESPACE", "GET", ns)
+                               require.NoError(t, r.Err())
+                               require.Equal(t, token, r.Val())
+                       }
+
+                       srv.Restart()
+                       for ns, token := range nsTokens {
+                               r := rdb.Do(ctx, "NAMESPACE", "GET", ns)
+                               require.NoError(t, r.Err())
+                               require.Equal(t, token, r.Val())
+                       }
+                       for ns := range nsTokens {
+                               r := rdb.Do(ctx, "NAMESPACE", "DEL", ns)
+                               require.NoError(t, r.Err())
+                               require.Equal(t, "OK", r.Val())
+                       }
+               }
+       })
+
        t.Run("Concurrent creating namespaces", func(t *testing.T) {
                threads := 4
                countPerThread := 10
@@ -101,3 +139,105 @@ func TestNamespace(t *testing.T) {
                })
        })
 }
+
+func TestNamespaceReplicate(t *testing.T) {
+       password := "pwd"
+       masterSrv := util.StartServer(t, map[string]string{
+               "requirepass": password,
+       })
+       defer masterSrv.Close()
+       masterRdb := masterSrv.NewClientWithOption(&redis.Options{
+               Password: password,
+       })
+       defer func() { require.NoError(t, masterRdb.Close()) }()
+
+       slaveSrv := util.StartServer(t, map[string]string{
+               "masterauth":  password,
+               "requirepass": password,
+       })
+       defer slaveSrv.Close()
+       slaveRdb := slaveSrv.NewClientWithOption(&redis.Options{
+               Password: password,
+       })
+       defer func() { require.NoError(t, slaveRdb.Close()) }()
+
+       util.SlaveOf(t, slaveRdb, masterSrv)
+       util.WaitForSync(t, slaveRdb)
+
+       ctx := context.Background()
+       nsTokens := map[string]string{
+               "n1": "t1",
+               "n2": "t2",
+               "n3": "t3",
+               "n4": "t4",
+       }
+
+       t.Run("Disable Replicate namespces", func(t *testing.T) {
+               require.NoError(t, masterRdb.ConfigSet(ctx, 
"repl-namespace-enabled", "no").Err())
+               require.NoError(t, slaveRdb.ConfigSet(ctx, 
"repl-namespace-enabled", "no").Err())
+
+               for ns, token := range nsTokens {
+                       r := masterRdb.Do(ctx, "NAMESPACE", "ADD", ns, token)
+                       require.NoError(t, r.Err())
+                       require.Equal(t, "OK", r.Val())
+               }
+               util.WaitForOffsetSync(t, slaveRdb, masterRdb)
+
+               // Can read namespaces on master
+               for ns, token := range nsTokens {
+                       r := masterRdb.Do(ctx, "NAMESPACE", "GET", ns)
+                       require.NoError(t, r.Err())
+                       require.Equal(t, token, r.Val())
+               }
+               // Can't read namespaces on slave
+               for ns := range nsTokens {
+                       r := slaveRdb.Do(ctx, "NAMESPACE", "GET", ns)
+                       require.EqualError(t, r.Err(), redis.Nil.Error())
+               }
+
+               for ns := range nsTokens {
+                       r := masterRdb.Do(ctx, "NAMESPACE", "DEL", ns)
+                       require.NoError(t, r.Err())
+                       require.Equal(t, "OK", r.Val())
+               }
+       })
+
+       t.Run("Enable Replicate namespaces", func(t *testing.T) {
+               require.NoError(t, masterRdb.ConfigSet(ctx, 
"repl-namespace-enabled", "yes").Err())
+               require.NoError(t, slaveRdb.ConfigSet(ctx, 
"repl-namespace-enabled", "yes").Err())
+
+               for ns, token := range nsTokens {
+                       r := masterRdb.Do(ctx, "NAMESPACE", "ADD", ns, token)
+                       require.NoError(t, r.Err())
+                       require.Equal(t, "OK", r.Val())
+               }
+               util.WaitForOffsetSync(t, slaveRdb, masterRdb)
+
+               for ns, token := range nsTokens {
+                       r := slaveRdb.Do(ctx, "NAMESPACE", "GET", ns)
+                       require.NoError(t, r.Err())
+                       require.Equal(t, token, r.Val())
+               }
+
+               for ns := range nsTokens {
+                       r := masterRdb.Do(ctx, "NAMESPACE", "DEL", ns)
+                       require.NoError(t, r.Err())
+                       require.Equal(t, "OK", r.Val())
+               }
+               util.WaitForOffsetSync(t, slaveRdb, masterRdb)
+
+               for ns := range nsTokens {
+                       r := slaveRdb.Do(ctx, "NAMESPACE", "GET", ns)
+                       require.EqualError(t, r.Err(), redis.Nil.Error())
+               }
+       })
+
+       t.Run("Don't allow to operate slave's namespace if replication is 
enabled", func(t *testing.T) {
+               r := slaveRdb.Do(ctx, "NAMESPACE", "ADD", "ns_xxxx", 
"token_xxxx")
+               util.ErrorRegexp(t, r.Err(), ".*ERR namespace is read-only for 
slave.*")
+       })
+
+       t.Run("Turn off namespace replication is not allowed", func(t 
*testing.T) {
+               util.ErrorRegexp(t, masterRdb.ConfigSet(ctx, 
"repl-namespace-enabled", "no").Err(), ".*cannot switch off 
repl_namespace_enabled when namespaces exist in db.*")
+       })
+}


Reply via email to