This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 7e1b7979 Fix wrongly try to rewrite the namespace in the cluster mode 
(#2221)
7e1b7979 is described below

commit 7e1b797920517edf9c798cd4b440789b524d7956
Author: hulk <[email protected]>
AuthorDate: Fri Apr 5 09:40:59 2024 +0800

    Fix wrongly try to rewrite the namespace in the cluster mode (#2221)
    
    This closes #2214
    
    The namespace mechanism is NOT allowed in cluster mode, so it's
    unnecessary to rewrite while the cluster mode is enabled. This
    config rewrite behavior will cause the replication issue
    mentioned in #2214 when starting the cluster node.
    
    The root cause is that the server will try to rewrite the namespace
    into the rocksdb if the option `repl-namespace-enabled` is enabled.
    So it will increase the server's rocksdb sequence and replication will
    start with the wrong offset. We have checked if the role is a slave
    before rewriting, but the cluster replication is NOT set at that 
time(master-replica is good).
    
    The good news is it only affects the cluster users who enabled
    the option `repl-namespace-enabled`, so I guess almost no user
    will do this since the namespace replication is meaningless to the cluster 
mode.
    
    ```
    === RUN   
TestClusterReplication/Cluster_replication_should_work_normally_after_restart
        replication_test.go:88:
                    Error Trace:    
/Users/hulk/code/cxx/kvrocks/tests/gocase/integration/replication/replication_test.go:88
                    Error:          Not equal:
                                    expected: "v1"
                                    actual  : "v0"
    ```
    
    And it works well after this patch.
---
 src/server/namespace.cc                            |  3 ++
 src/storage/redis_pubsub.cc                        |  3 ++
 src/storage/storage.cc                             |  3 ++
 .../integration/replication/replication_test.go    | 58 ++++++++++++++++++++++
 tests/gocase/util/server.go                        |  3 ++
 5 files changed, 70 insertions(+)

diff --git a/src/server/namespace.cc b/src/server/namespace.cc
index 01a44dcf..504ad523 100644
--- a/src/server/namespace.cc
+++ b/src/server/namespace.cc
@@ -53,6 +53,9 @@ bool Namespace::IsAllowModify() const {
 
 Status Namespace::LoadAndRewrite() {
   auto config = storage_->GetConfig();
+  // Namespace is NOT allowed in the cluster mode, so we don't need to rewrite 
here.
+  if (config->cluster_enabled) return Status::OK();
+
   // Load from the configuration file first
   tokens_ = config->load_tokens;
 
diff --git a/src/storage/redis_pubsub.cc b/src/storage/redis_pubsub.cc
index 52264ff9..6ca153b7 100644
--- a/src/storage/redis_pubsub.cc
+++ b/src/storage/redis_pubsub.cc
@@ -23,6 +23,9 @@
 namespace redis {
 
 rocksdb::Status PubSub::Publish(const Slice &channel, const Slice &value) {
+  if (storage_->GetConfig()->IsSlave()) {
+    return rocksdb::Status::NotSupported("can't publish to db in slave mode");
+  }
   auto batch = storage_->GetWriteBatchBase();
   batch->Put(pubsub_cf_handle_, channel, value);
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index 3074664e..2133645c 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -863,6 +863,9 @@ ObserverOrUniquePtr<rocksdb::WriteBatchBase> 
Storage::GetWriteBatchBase() {
 }
 
 Status Storage::WriteToPropagateCF(const std::string &key, const std::string 
&value) {
+  if (config_->IsSlave()) {
+    return {Status::NotOK, "cannot write to propagate column family in slave 
mode"};
+  }
   auto batch = GetWriteBatchBase();
   auto cf = GetCFHandle(kPropagateColumnFamilyName);
   batch->Put(cf, key, value);
diff --git a/tests/gocase/integration/replication/replication_test.go 
b/tests/gocase/integration/replication/replication_test.go
index 20de3581..71e08c77 100644
--- a/tests/gocase/integration/replication/replication_test.go
+++ b/tests/gocase/integration/replication/replication_test.go
@@ -32,6 +32,64 @@ import (
        "github.com/stretchr/testify/require"
 )
 
+func TestClusterReplication(t *testing.T) {
+       ctx := context.Background()
+
+       masterSrv := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { masterSrv.Close() }()
+       masterClient := masterSrv.NewClient()
+       defer func() { require.NoError(t, masterClient.Close()) }()
+       masterNodeID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", 
masterNodeID).Err())
+
+       replicaSrv := util.StartServer(t, map[string]string{
+               "cluster-enabled": "yes",
+               // enabled the replication namespace to reproduce the issue 
#2214
+               "repl-namespace-enabled": "yes",
+       })
+       defer func() { replicaSrv.Close() }()
+       replicaClient := replicaSrv.NewClient()
+       // allow to run the read-only command in the replica
+       require.NoError(t, replicaClient.ReadOnly(ctx).Err())
+       defer func() { require.NoError(t, replicaClient.Close()) }()
+       replicaNodeID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+       require.NoError(t, replicaClient.Do(ctx, "clusterx", "SETNODEID", 
replicaNodeID).Err())
+
+       clusterNodes := fmt.Sprintf("%s 127.0.0.1 %d master - 0-16383", 
masterNodeID, masterSrv.Port())
+       clusterNodes = fmt.Sprintf("%s\n%s 127.0.0.1 %d slave %s", 
clusterNodes, replicaNodeID, replicaSrv.Port(), masterNodeID)
+
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+       require.NoError(t, replicaClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+
+       t.Run("Cluster replication should work", func(t *testing.T) {
+               util.WaitForSync(t, replicaClient)
+               require.Equal(t, "slave", util.FindInfoEntry(replicaClient, 
"role"))
+               masterClient.Set(ctx, "k0", "v0", 0)
+               masterClient.LPush(ctx, "k1", "e0", "e1", "e2")
+               util.WaitForOffsetSync(t, masterClient, replicaClient)
+
+               require.Equal(t, "v0", replicaClient.Get(ctx, "k0").Val())
+               require.Equal(t, []string{"e2", "e1", "e0"}, 
replicaClient.LRange(ctx, "k1", 0, -1).Val())
+       })
+
+       t.Run("Cluster replication should work normally after restart(issue 
#2214)", func(t *testing.T) {
+               replicaSrv.Close()
+               masterClient.Set(ctx, "k0", "v1", 0)
+               masterClient.HSet(ctx, "k2", "f0", "v0", "f1", "v1")
+
+               // start the replica server again
+               replicaSrv.Start()
+               _ = replicaClient.Close()
+               replicaClient = replicaSrv.NewClient()
+               // allow to run the read-only command in the replica
+               require.NoError(t, replicaClient.ReadOnly(ctx).Err())
+
+               util.WaitForOffsetSync(t, masterClient, replicaClient)
+               require.Equal(t, "v1", replicaClient.Get(ctx, "k0").Val())
+               require.Equal(t, map[string]string{"f0": "v0", "f1": "v1"}, 
replicaClient.HGetAll(ctx, "k2").Val())
+       })
+}
+
 func TestReplicationWithHostname(t *testing.T) {
        srvA := util.StartServer(t, map[string]string{})
        defer srvA.Close()
diff --git a/tests/gocase/util/server.go b/tests/gocase/util/server.go
index a3f4314e..849b47ad 100644
--- a/tests/gocase/util/server.go
+++ b/tests/gocase/util/server.go
@@ -136,7 +136,10 @@ func (s *KvrocksServer) close(keepDir bool) {
 
 func (s *KvrocksServer) Restart() {
        s.close(true)
+       s.Start()
+}
 
+func (s *KvrocksServer) Start() {
        b := *binPath
        require.NotEmpty(s.t, b, "please set the binary path by `-binPath`")
        cmd := exec.Command(b)

Reply via email to