This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 7e1b7979 Fix wrongly try to rewrite the namespace in the cluster mode
(#2221)
7e1b7979 is described below
commit 7e1b797920517edf9c798cd4b440789b524d7956
Author: hulk <[email protected]>
AuthorDate: Fri Apr 5 09:40:59 2024 +0800
Fix wrongly try to rewrite the namespace in the cluster mode (#2221)
This closes #2214
The namespace mechanism is NOT allowed in cluster mode, so it's
unnecessary to rewrite while the cluster mode is enabled. This
config rewrite behavior will cause the replication issue
mentioned in #2214 when starting the cluster node.
The root cause is that the server will try to rewrite the namespace
into the rocksdb if the option `repl-namespace-enabled` is enabled.
So it will increase the server's rocksdb sequence and replication will
start with the wrong offset. We have checked if the role is a slave
before rewriting, but the cluster replication is NOT set at that
time(master-replica is good).
The good news is it only affects the cluster users who enabled
the option `repl-namespace-enabled`, so I guess almost no user
will do this since the namespace replication is meaningless to the cluster
mode.
```
=== RUN
TestClusterReplication/Cluster_replication_should_work_normally_after_restart
replication_test.go:88:
Error Trace:
/Users/hulk/code/cxx/kvrocks/tests/gocase/integration/replication/replication_test.go:88
Error: Not equal:
expected: "v1"
actual : "v0"
```
And it works well after this patch.
---
src/server/namespace.cc | 3 ++
src/storage/redis_pubsub.cc | 3 ++
src/storage/storage.cc | 3 ++
.../integration/replication/replication_test.go | 58 ++++++++++++++++++++++
tests/gocase/util/server.go | 3 ++
5 files changed, 70 insertions(+)
diff --git a/src/server/namespace.cc b/src/server/namespace.cc
index 01a44dcf..504ad523 100644
--- a/src/server/namespace.cc
+++ b/src/server/namespace.cc
@@ -53,6 +53,9 @@ bool Namespace::IsAllowModify() const {
Status Namespace::LoadAndRewrite() {
auto config = storage_->GetConfig();
+ // Namespace is NOT allowed in the cluster mode, so we don't need to rewrite
here.
+ if (config->cluster_enabled) return Status::OK();
+
// Load from the configuration file first
tokens_ = config->load_tokens;
diff --git a/src/storage/redis_pubsub.cc b/src/storage/redis_pubsub.cc
index 52264ff9..6ca153b7 100644
--- a/src/storage/redis_pubsub.cc
+++ b/src/storage/redis_pubsub.cc
@@ -23,6 +23,9 @@
namespace redis {
rocksdb::Status PubSub::Publish(const Slice &channel, const Slice &value) {
+ if (storage_->GetConfig()->IsSlave()) {
+ return rocksdb::Status::NotSupported("can't publish to db in slave mode");
+ }
auto batch = storage_->GetWriteBatchBase();
batch->Put(pubsub_cf_handle_, channel, value);
return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index 3074664e..2133645c 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -863,6 +863,9 @@ ObserverOrUniquePtr<rocksdb::WriteBatchBase>
Storage::GetWriteBatchBase() {
}
Status Storage::WriteToPropagateCF(const std::string &key, const std::string
&value) {
+ if (config_->IsSlave()) {
+ return {Status::NotOK, "cannot write to propagate column family in slave
mode"};
+ }
auto batch = GetWriteBatchBase();
auto cf = GetCFHandle(kPropagateColumnFamilyName);
batch->Put(cf, key, value);
diff --git a/tests/gocase/integration/replication/replication_test.go
b/tests/gocase/integration/replication/replication_test.go
index 20de3581..71e08c77 100644
--- a/tests/gocase/integration/replication/replication_test.go
+++ b/tests/gocase/integration/replication/replication_test.go
@@ -32,6 +32,64 @@ import (
"github.com/stretchr/testify/require"
)
+func TestClusterReplication(t *testing.T) {
+ ctx := context.Background()
+
+ masterSrv := util.StartServer(t, map[string]string{"cluster-enabled":
"yes"})
+ defer func() { masterSrv.Close() }()
+ masterClient := masterSrv.NewClient()
+ defer func() { require.NoError(t, masterClient.Close()) }()
+ masterNodeID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+ require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID",
masterNodeID).Err())
+
+ replicaSrv := util.StartServer(t, map[string]string{
+ "cluster-enabled": "yes",
+ // enabled the replication namespace to reproduce the issue
#2214
+ "repl-namespace-enabled": "yes",
+ })
+ defer func() { replicaSrv.Close() }()
+ replicaClient := replicaSrv.NewClient()
+ // allow to run the read-only command in the replica
+ require.NoError(t, replicaClient.ReadOnly(ctx).Err())
+ defer func() { require.NoError(t, replicaClient.Close()) }()
+ replicaNodeID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+ require.NoError(t, replicaClient.Do(ctx, "clusterx", "SETNODEID",
replicaNodeID).Err())
+
+ clusterNodes := fmt.Sprintf("%s 127.0.0.1 %d master - 0-16383",
masterNodeID, masterSrv.Port())
+ clusterNodes = fmt.Sprintf("%s\n%s 127.0.0.1 %d slave %s",
clusterNodes, replicaNodeID, replicaSrv.Port(), masterNodeID)
+
+ require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES",
clusterNodes, "1").Err())
+ require.NoError(t, replicaClient.Do(ctx, "clusterx", "SETNODES",
clusterNodes, "1").Err())
+
+ t.Run("Cluster replication should work", func(t *testing.T) {
+ util.WaitForSync(t, replicaClient)
+ require.Equal(t, "slave", util.FindInfoEntry(replicaClient,
"role"))
+ masterClient.Set(ctx, "k0", "v0", 0)
+ masterClient.LPush(ctx, "k1", "e0", "e1", "e2")
+ util.WaitForOffsetSync(t, masterClient, replicaClient)
+
+ require.Equal(t, "v0", replicaClient.Get(ctx, "k0").Val())
+ require.Equal(t, []string{"e2", "e1", "e0"},
replicaClient.LRange(ctx, "k1", 0, -1).Val())
+ })
+
+ t.Run("Cluster replication should work normally after restart(issue
#2214)", func(t *testing.T) {
+ replicaSrv.Close()
+ masterClient.Set(ctx, "k0", "v1", 0)
+ masterClient.HSet(ctx, "k2", "f0", "v0", "f1", "v1")
+
+ // start the replica server again
+ replicaSrv.Start()
+ _ = replicaClient.Close()
+ replicaClient = replicaSrv.NewClient()
+ // allow to run the read-only command in the replica
+ require.NoError(t, replicaClient.ReadOnly(ctx).Err())
+
+ util.WaitForOffsetSync(t, masterClient, replicaClient)
+ require.Equal(t, "v1", replicaClient.Get(ctx, "k0").Val())
+ require.Equal(t, map[string]string{"f0": "v0", "f1": "v1"},
replicaClient.HGetAll(ctx, "k2").Val())
+ })
+}
+
func TestReplicationWithHostname(t *testing.T) {
srvA := util.StartServer(t, map[string]string{})
defer srvA.Close()
diff --git a/tests/gocase/util/server.go b/tests/gocase/util/server.go
index a3f4314e..849b47ad 100644
--- a/tests/gocase/util/server.go
+++ b/tests/gocase/util/server.go
@@ -136,7 +136,10 @@ func (s *KvrocksServer) close(keepDir bool) {
func (s *KvrocksServer) Restart() {
s.close(true)
+ s.Start()
+}
+func (s *KvrocksServer) Start() {
b := *binPath
require.NotEmpty(s.t, b, "please set the binary path by `-binPath`")
cmd := exec.Command(b)