This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git
The following commit(s) were added to refs/heads/unstable by this push:
new d77ea0f Add the support of using Apache Zookeeper as an alternative
storage (#126)
d77ea0f is described below
commit d77ea0f2fe864d5a1466b3fb9c64bcb828a2b32f
Author: 纪华裕 <[email protected]>
AuthorDate: Sat Dec 2 22:21:02 2023 +0800
Add the support of using Apache Zookeeper as an alternative storage (#126)
---
config/config.go | 11 +-
config/config.yaml | 14 ++
go.mod | 1 +
go.sum | 2 +
scripts/docker/docker-compose.yml | 6 +
server/server.go | 21 +-
storage/persistence/zookeeper/zookeeper.go | 271 ++++++++++++++++++++++++
storage/persistence/zookeeper/zookeeper_test.go | 107 ++++++++++
8 files changed, 428 insertions(+), 5 deletions(-)
diff --git a/config/config.go b/config/config.go
index 656ee80..c0370df 100644
--- a/config/config.go
+++ b/config/config.go
@@ -26,6 +26,7 @@ import (
"os"
"github.com/apache/kvrocks-controller/storage/persistence/etcd"
+ "github.com/apache/kvrocks-controller/storage/persistence/zookeeper"
"github.com/go-playground/validator/v10"
)
@@ -48,10 +49,12 @@ type ControllerConfig struct {
const defaultPort = 9379
type Config struct {
- Addr string `yaml:"addr"`
- Etcd *etcd.Config `yaml:"etcd"`
- Admin AdminConfig `yaml:"admin"`
- Controller *ControllerConfig `yaml:"controller"`
+ Addr string `yaml:"addr"`
+ StorageType string `yaml:"storage_type"`
+ Etcd *etcd.Config `yaml:"etcd"`
+ Zookeeper *zookeeper.Config `yaml:"zookeeper"`
+ Admin AdminConfig `yaml:"admin"`
+ Controller *ControllerConfig `yaml:"controller"`
}
func Default() *Config {
diff --git a/config/config.yaml b/config/config.yaml
index ad7b24b..d09fdf4 100644
--- a/config/config.yaml
+++ b/config/config.yaml
@@ -18,17 +18,31 @@
addr: "127.0.0.1:9379"
+
+# Which storage engine should be used by controller
+# options: etcd, zookeeper
+# default: etcd
+storage_type: etcd
+
etcd:
addrs:
- "127.0.0.1:2379"
username:
password:
+ elect_path:
tls:
enable: false
cert_file:
key_file:
ca_file:
+zookeeper:
+ addrs:
+ - "127.0.0.1:2181"
+ scheme:
+ auth:
+ elect_path:
+
controller:
failover:
gc_interval_seconds: 3600
diff --git a/go.mod b/go.mod
index 1ba9483..364a538 100644
--- a/go.mod
+++ b/go.mod
@@ -8,6 +8,7 @@ require (
github.com/go-playground/validator/v10 v10.9.0
github.com/go-redis/redis/v8 v8.11.5
github.com/go-resty/resty/v2 v2.7.0
+ github.com/go-zookeeper/zk v1.0.3
github.com/google/uuid v1.3.0
github.com/jedib0t/go-pretty/v6 v6.4.6
github.com/prometheus/client_golang v1.11.1
diff --git a/go.sum b/go.sum
index 233ddae..9c55cf5 100644
--- a/go.sum
+++ b/go.sum
@@ -72,6 +72,8 @@ github.com/go-redis/redis/v8 v8.11.5/go.mod
h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq
github.com/go-resty/resty/v2 v2.7.0
h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY=
github.com/go-resty/resty/v2 v2.7.0/go.mod
h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I=
github.com/go-stack/stack v1.8.0/go.mod
h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
+github.com/go-zookeeper/zk v1.0.3
h1:7M2kwOsc//9VeeFiPtf+uSJlVpU66x9Ba5+8XK7/TDg=
+github.com/go-zookeeper/zk v1.0.3/go.mod
h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
github.com/godbus/dbus/v5 v5.0.4/go.mod
h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.1.1/go.mod
h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
diff --git a/scripts/docker/docker-compose.yml
b/scripts/docker/docker-compose.yml
index c681c05..d483760 100644
--- a/scripts/docker/docker-compose.yml
+++ b/scripts/docker/docker-compose.yml
@@ -35,3 +35,9 @@ services:
- ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster
- ETCD_INITIAL_CLUSTER=etcd0=http://etcd0:2380
- ETCD_INITIAL_CLUSTER_STATE=new
+ zookeeper0:
+ image: "zookeeper:latest"
+ container_name: zookeeper0
+ ports:
+ - "2181:2181"
+
diff --git a/server/server.go b/server/server.go
index f460c09..336d647 100644
--- a/server/server.go
+++ b/server/server.go
@@ -24,13 +24,17 @@ import (
"fmt"
"net/http"
"net/http/pprof"
+ "strings"
"time"
"github.com/apache/kvrocks-controller/config"
"github.com/apache/kvrocks-controller/controller"
"github.com/apache/kvrocks-controller/controller/probe"
+ "github.com/apache/kvrocks-controller/logger"
"github.com/apache/kvrocks-controller/storage"
+ "github.com/apache/kvrocks-controller/storage/persistence"
"github.com/apache/kvrocks-controller/storage/persistence/etcd"
+ "github.com/apache/kvrocks-controller/storage/persistence/zookeeper"
"github.com/gin-gonic/gin"
)
@@ -44,11 +48,26 @@ type Server struct {
}
func NewServer(cfg *config.Config) (*Server, error) {
+ var persist persistence.Persistence
+ var err error
+ switch {
+ case strings.EqualFold(cfg.StorageType, "etcd"):
+ logger.Get().Info("Use Etcd as storage")
+ persist, err = etcd.New(cfg.Addr, cfg.Etcd)
+ case strings.EqualFold(cfg.StorageType, "zookeeper"):
+ logger.Get().Info("Use Zookeeper as storage")
+ persist, err = zookeeper.New(cfg.Addr, cfg.Zookeeper)
+ default:
+ logger.Get().Info("Use Etcd as default storage")
+ persist, err = etcd.New(cfg.Addr, cfg.Etcd)
+ }
- persist, err := etcd.New(cfg.Addr, cfg.Etcd)
if err != nil {
return nil, err
}
+ if persist == nil {
+ return nil, fmt.Errorf("no found any storage config")
+ }
storage, err := storage.NewStorage(persist)
if err != nil {
return nil, err
diff --git a/storage/persistence/zookeeper/zookeeper.go
b/storage/persistence/zookeeper/zookeeper.go
new file mode 100644
index 0000000..5b00a6e
--- /dev/null
+++ b/storage/persistence/zookeeper/zookeeper.go
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package zookeeper
+
+import (
+ "context"
+ "errors"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/apache/kvrocks-controller/logger"
+ "github.com/apache/kvrocks-controller/storage/persistence"
+ "github.com/go-zookeeper/zk"
+ "go.uber.org/atomic"
+)
+
+const (
+ sessionTTL = 6 * time.Second
+)
+
+const defaultElectPath = "/kvrocks/controller/leader"
+
+type Config struct {
+ Addrs []string `yaml:"addrs"`
+ Scheme string `yaml:"scheme"`
+ Auth string `yaml:"auth"`
+ ElectPath string `yaml:"elect_path"`
+}
+
+type Zookeeper struct {
+ conn *zk.Conn
+ acl []zk.ACL // We will set this ACL for the node we have
created
+ leaderMu sync.RWMutex
+ leaderID string
+ myID string
+ electPath string
+ isReady atomic.Bool
+ quitCh chan struct{}
+ leaderChangeCh chan bool
+ wg sync.WaitGroup
+}
+
+func New(id string, cfg *Config) (*Zookeeper, error) {
+ if len(id) == 0 {
+ return nil, errors.New("id must NOT be a empty string")
+ }
+ conn, _, err := zk.Connect(cfg.Addrs, sessionTTL)
+ if err != nil {
+ return nil, err
+ }
+ electPath := defaultElectPath
+ if cfg.ElectPath != "" {
+ electPath = cfg.ElectPath
+ }
+ acl := zk.WorldACL(zk.PermAll)
+ if cfg.Scheme != "" && cfg.Auth != "" {
+ err := conn.AddAuth(cfg.Scheme, []byte(cfg.Auth))
+ if err == nil {
+ acl = []zk.ACL{{Perms: zk.PermAll, Scheme: cfg.Scheme,
ID: cfg.Auth}}
+ } else {
+ logger.Get().Warn("Zookeeper addAuth fail: " +
err.Error())
+ }
+ }
+ e := &Zookeeper{
+ myID: id,
+ acl: acl,
+ electPath: electPath,
+ conn: conn,
+ quitCh: make(chan struct{}),
+ leaderChangeCh: make(chan bool),
+ wg: sync.WaitGroup{},
+ }
+ e.isReady.Store(false)
+ e.wg.Add(1)
+ go e.electLoop(context.Background())
+ return e, nil
+}
+
+func (e *Zookeeper) ID() string {
+ return e.myID
+}
+
+func (e *Zookeeper) Leader() string {
+ e.leaderMu.RLock()
+ defer e.leaderMu.RUnlock()
+ return e.leaderID
+}
+
+func (e *Zookeeper) LeaderChange() <-chan bool {
+ return e.leaderChangeCh
+}
+
+func (e *Zookeeper) IsReady(ctx context.Context) bool {
+ for {
+ select {
+ case <-e.quitCh:
+ return false
+ case <-time.After(100 * time.Millisecond):
+ if e.isReady.Load() {
+ return true
+ }
+ case <-ctx.Done():
+ return e.isReady.Load()
+ }
+ }
+}
+
+func (e *Zookeeper) Get(ctx context.Context, key string) ([]byte, error) {
+ data, _, err := e.conn.Get(key)
+ if err != nil {
+ if errors.Is(err, zk.ErrNoNode) {
+ return nil, nil // Key does not exist
+ }
+ return nil, err
+ }
+
+ return data, nil
+}
+
+func (e *Zookeeper) Exists(ctx context.Context, key string) (bool, error) {
+ exists, _, err := e.conn.Exists(key)
+ if err != nil {
+ return false, err
+ }
+ return exists, nil
+}
+
+// If the key exists, it will be set; if not, it will be created.
+func (e *Zookeeper) Set(ctx context.Context, key string, value []byte) error {
+ exist, _ := e.Exists(ctx, key)
+ if exist {
+ _, err := e.conn.Set(key, value, -1)
+ return err
+ }
+
+ return e.Create(ctx, key, value, 0)
+}
+
+func (e *Zookeeper) Create(ctx context.Context, key string, value []byte,
flags int32) error {
+ lastSlashIndex := strings.LastIndex(key, "/")
+ if lastSlashIndex > 0 {
+ substring := key[:lastSlashIndex]
+ // If the parent node does not exist, create the parent node
recursively
+ exist, _ := e.Exists(ctx, substring)
+ if !exist {
+ err := e.Create(ctx, substring, []byte{}, 0)
+ if err != nil {
+ return err
+ }
+ }
+ }
+ _, err := e.conn.Create(key, value, flags, e.acl)
+ return err
+}
+
+func (e *Zookeeper) Delete(ctx context.Context, key string) error {
+ err := e.conn.Delete(key, -1)
+ if errors.Is(err, zk.ErrNoNode) {
+ return nil // Key does not exist
+ }
+ return err
+}
+
+func (e *Zookeeper) List(ctx context.Context, prefix string)
([]persistence.Entry, error) {
+ children, _, err := e.conn.Children(prefix)
+ if errors.Is(err, zk.ErrNoNode) {
+ return []persistence.Entry{}, nil
+ } else if err != nil {
+ return nil, err
+ }
+
+ entries := make([]persistence.Entry, 0)
+ for _, child := range children {
+ key := prefix + "/" + child
+ data, _, err := e.conn.Get(key)
+ if err != nil {
+ return nil, err
+ }
+
+ entry := persistence.Entry{
+ Key: child,
+ Value: data,
+ }
+ entries = append(entries, entry)
+ }
+
+ return entries, nil
+}
+
+func (e *Zookeeper) SetleaderID(newLeaderID string) {
+ if newLeaderID != "" && newLeaderID != e.leaderID {
+ if !e.isReady.Load() {
+ // we set ready flag when leaderID first changed
+ e.isReady.Store(true)
+ }
+ e.leaderMu.Lock()
+ e.leaderID = newLeaderID
+ e.leaderMu.Unlock()
+ e.leaderChangeCh <- true
+ }
+}
+
+func (e *Zookeeper) electLoop(ctx context.Context) {
+ defer e.wg.Done()
+reset:
+ select {
+ case <-e.quitCh:
+ return
+ default:
+ }
+ err := e.Create(ctx, e.electPath, []byte(e.myID), zk.FlagEphemeral)
+ if err != nil && !errors.Is(err, zk.ErrNodeExists) {
+ time.Sleep(sessionTTL / 3)
+ goto reset
+ }
+ data, _, ch, err := e.conn.GetW(e.electPath)
+ if err != nil {
+ time.Sleep(sessionTTL / 3)
+ goto reset
+ }
+ e.SetleaderID(string(data))
+
+ for {
+ select {
+ case resp := <-ch:
+ if resp.Type == zk.EventNodeDeleted {
+ err := e.Create(ctx, e.electPath,
[]byte(e.myID), zk.FlagEphemeral)
+ if err != nil && !errors.Is(err,
zk.ErrNodeExists) {
+ time.Sleep(sessionTTL / 3)
+ goto reset
+ }
+ }
+ data, _, ch, err = e.conn.GetW(e.electPath)
+ if err != nil {
+ time.Sleep(sessionTTL / 3)
+ goto reset
+ }
+ e.SetleaderID(string(data))
+ case <-e.quitCh:
+ logger.Get().Info(e.myID + " Exit the leader election
loop")
+ return
+ }
+
+ }
+
+}
+
+func (e *Zookeeper) Close() error {
+ close(e.quitCh)
+ e.wg.Wait()
+ e.conn.Close()
+ return nil
+}
diff --git a/storage/persistence/zookeeper/zookeeper_test.go
b/storage/persistence/zookeeper/zookeeper_test.go
new file mode 100644
index 0000000..278a02a
--- /dev/null
+++ b/storage/persistence/zookeeper/zookeeper_test.go
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package zookeeper
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/apache/kvrocks-controller/util"
+
+ "github.com/stretchr/testify/require"
+)
+
+const addr = "127.0.0.1:2181"
+
+func TestBasicOperations(t *testing.T) {
+ id := util.RandString(40)
+ testElectPath := "/" + util.RandString(8) + "/" + util.RandString(8)
+ persist, err := New(id, &Config{
+ ElectPath: testElectPath,
+ Addrs: []string{addr},
+ })
+ require.NoError(t, err)
+ defer persist.Close()
+ go func() {
+ for range persist.LeaderChange() {
+ // do nothing
+ }
+ }()
+
+ ctx := context.Background()
+ keys := []string{"/a/b/c0", "/a/b/c1", "/a/b/c2"}
+ value := []byte("v")
+ for _, key := range keys {
+ require.NoError(t, persist.Set(ctx, key, value))
+ gotValue, err := persist.Get(ctx, key)
+ require.NoError(t, err)
+ require.Equal(t, value, gotValue)
+ }
+ entries, err := persist.List(ctx, "/a/b")
+ require.NoError(t, err)
+ require.Equal(t, len(keys), len(entries))
+ for _, key := range keys {
+ require.NoError(t, persist.Delete(ctx, key))
+ }
+}
+
+func TestElect(t *testing.T) {
+ endpoints := []string{addr}
+
+ testElectPath := "/" + util.RandString(8) + "/" + util.RandString(8)
+ id0 := util.RandString(40)
+ node0, err := New(id0, &Config{
+ ElectPath: testElectPath,
+ Addrs: endpoints,
+ })
+ require.NoError(t, err)
+ require.Eventuallyf(t, func() bool {
+ return node0.Leader() == node0.myID
+ }, 10*time.Second, 100*time.Millisecond, "node0 should be the leader")
+
+ id1 := util.RandString(40)
+ node1, err := New(id1, &Config{
+ ElectPath: testElectPath,
+ Addrs: endpoints,
+ })
+ require.NoError(t, err)
+ require.Eventuallyf(t, func() bool {
+ return node1.Leader() == node0.myID
+ }, 10*time.Second, 100*time.Millisecond, "node1's leader should be the
node0")
+
+ go func() {
+ for {
+ select {
+ case <-node0.LeaderChange():
+ // do nothing
+ case <-node1.LeaderChange():
+ // do nothing
+ }
+ }
+ }()
+
+ require.NoError(t, node0.Close())
+
+ require.Eventuallyf(t, func() bool {
+ return node1.Leader() == node1.myID
+ }, 15*time.Second, 100*time.Millisecond, "node1 should be the leader")
+ require.NoError(t, node1.Close())
+}