This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git


The following commit(s) were added to refs/heads/unstable by this push:
     new d77ea0f  Add the support of using Apache Zookeeper as an alternative 
storage (#126)
d77ea0f is described below

commit d77ea0f2fe864d5a1466b3fb9c64bcb828a2b32f
Author: 纪华裕 <[email protected]>
AuthorDate: Sat Dec 2 22:21:02 2023 +0800

    Add the support of using Apache Zookeeper as an alternative storage (#126)
---
 config/config.go                                |  11 +-
 config/config.yaml                              |  14 ++
 go.mod                                          |   1 +
 go.sum                                          |   2 +
 scripts/docker/docker-compose.yml               |   6 +
 server/server.go                                |  21 +-
 storage/persistence/zookeeper/zookeeper.go      | 271 ++++++++++++++++++++++++
 storage/persistence/zookeeper/zookeeper_test.go | 107 ++++++++++
 8 files changed, 428 insertions(+), 5 deletions(-)

diff --git a/config/config.go b/config/config.go
index 656ee80..c0370df 100644
--- a/config/config.go
+++ b/config/config.go
@@ -26,6 +26,7 @@ import (
        "os"
 
        "github.com/apache/kvrocks-controller/storage/persistence/etcd"
+       "github.com/apache/kvrocks-controller/storage/persistence/zookeeper"
        "github.com/go-playground/validator/v10"
 )
 
@@ -48,10 +49,12 @@ type ControllerConfig struct {
 const defaultPort = 9379
 
 type Config struct {
-       Addr       string            `yaml:"addr"`
-       Etcd       *etcd.Config      `yaml:"etcd"`
-       Admin      AdminConfig       `yaml:"admin"`
-       Controller *ControllerConfig `yaml:"controller"`
+       Addr        string            `yaml:"addr"`
+       StorageType string            `yaml:"storage_type"`
+       Etcd        *etcd.Config      `yaml:"etcd"`
+       Zookeeper   *zookeeper.Config `yaml:"zookeeper"`
+       Admin       AdminConfig       `yaml:"admin"`
+       Controller  *ControllerConfig `yaml:"controller"`
 }
 
 func Default() *Config {
diff --git a/config/config.yaml b/config/config.yaml
index ad7b24b..d09fdf4 100644
--- a/config/config.yaml
+++ b/config/config.yaml
@@ -18,17 +18,31 @@
 
 addr: "127.0.0.1:9379"
 
+
+# Which storage engine should be used by controller
+# options: etcd, zookeeper
+# default: etcd
+storage_type: etcd
+
 etcd:
   addrs:
     - "127.0.0.1:2379"
   username:
   password:
+  elect_path:
   tls:
     enable: false
     cert_file:
     key_file:
     ca_file:
 
+zookeeper:
+  addrs:
+    - "127.0.0.1:2181"
+  scheme:
+  auth:
+  elect_path:
+
 controller:
   failover:
     gc_interval_seconds: 3600
diff --git a/go.mod b/go.mod
index 1ba9483..364a538 100644
--- a/go.mod
+++ b/go.mod
@@ -8,6 +8,7 @@ require (
        github.com/go-playground/validator/v10 v10.9.0
        github.com/go-redis/redis/v8 v8.11.5
        github.com/go-resty/resty/v2 v2.7.0
+       github.com/go-zookeeper/zk v1.0.3
        github.com/google/uuid v1.3.0
        github.com/jedib0t/go-pretty/v6 v6.4.6
        github.com/prometheus/client_golang v1.11.1
diff --git a/go.sum b/go.sum
index 233ddae..9c55cf5 100644
--- a/go.sum
+++ b/go.sum
@@ -72,6 +72,8 @@ github.com/go-redis/redis/v8 v8.11.5/go.mod 
h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq
 github.com/go-resty/resty/v2 v2.7.0 
h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY=
 github.com/go-resty/resty/v2 v2.7.0/go.mod 
h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I=
 github.com/go-stack/stack v1.8.0/go.mod 
h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
+github.com/go-zookeeper/zk v1.0.3 
h1:7M2kwOsc//9VeeFiPtf+uSJlVpU66x9Ba5+8XK7/TDg=
+github.com/go-zookeeper/zk v1.0.3/go.mod 
h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
 github.com/godbus/dbus/v5 v5.0.4/go.mod 
h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
 github.com/gogo/protobuf v1.1.1/go.mod 
h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
 github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
diff --git a/scripts/docker/docker-compose.yml 
b/scripts/docker/docker-compose.yml
index c681c05..d483760 100644
--- a/scripts/docker/docker-compose.yml
+++ b/scripts/docker/docker-compose.yml
@@ -35,3 +35,9 @@ services:
       - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster
       - ETCD_INITIAL_CLUSTER=etcd0=http://etcd0:2380
       - ETCD_INITIAL_CLUSTER_STATE=new
+  zookeeper0:
+    image: "zookeeper:latest"
+    container_name: zookeeper0
+    ports:
+      - "2181:2181"
+
diff --git a/server/server.go b/server/server.go
index f460c09..336d647 100644
--- a/server/server.go
+++ b/server/server.go
@@ -24,13 +24,17 @@ import (
        "fmt"
        "net/http"
        "net/http/pprof"
+       "strings"
        "time"
 
        "github.com/apache/kvrocks-controller/config"
        "github.com/apache/kvrocks-controller/controller"
        "github.com/apache/kvrocks-controller/controller/probe"
+       "github.com/apache/kvrocks-controller/logger"
        "github.com/apache/kvrocks-controller/storage"
+       "github.com/apache/kvrocks-controller/storage/persistence"
        "github.com/apache/kvrocks-controller/storage/persistence/etcd"
+       "github.com/apache/kvrocks-controller/storage/persistence/zookeeper"
        "github.com/gin-gonic/gin"
 )
 
@@ -44,11 +48,26 @@ type Server struct {
 }
 
 func NewServer(cfg *config.Config) (*Server, error) {
+       var persist persistence.Persistence
+       var err error
+       switch {
+       case strings.EqualFold(cfg.StorageType, "etcd"):
+               logger.Get().Info("Use Etcd as storage")
+               persist, err = etcd.New(cfg.Addr, cfg.Etcd)
+       case strings.EqualFold(cfg.StorageType, "zookeeper"):
+               logger.Get().Info("Use Zookeeper as storage")
+               persist, err = zookeeper.New(cfg.Addr, cfg.Zookeeper)
+       default:
+               logger.Get().Info("Use Etcd as default storage")
+               persist, err = etcd.New(cfg.Addr, cfg.Etcd)
+       }
 
-       persist, err := etcd.New(cfg.Addr, cfg.Etcd)
        if err != nil {
                return nil, err
        }
+       if persist == nil {
+               return nil, fmt.Errorf("no found any storage config")
+       }
        storage, err := storage.NewStorage(persist)
        if err != nil {
                return nil, err
diff --git a/storage/persistence/zookeeper/zookeeper.go 
b/storage/persistence/zookeeper/zookeeper.go
new file mode 100644
index 0000000..5b00a6e
--- /dev/null
+++ b/storage/persistence/zookeeper/zookeeper.go
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package zookeeper
+
+import (
+       "context"
+       "errors"
+       "strings"
+       "sync"
+       "time"
+
+       "github.com/apache/kvrocks-controller/logger"
+       "github.com/apache/kvrocks-controller/storage/persistence"
+       "github.com/go-zookeeper/zk"
+       "go.uber.org/atomic"
+)
+
+const (
+       sessionTTL = 6 * time.Second
+)
+
+const defaultElectPath = "/kvrocks/controller/leader"
+
+type Config struct {
+       Addrs     []string `yaml:"addrs"`
+       Scheme    string   `yaml:"scheme"`
+       Auth      string   `yaml:"auth"`
+       ElectPath string   `yaml:"elect_path"`
+}
+
+type Zookeeper struct {
+       conn           *zk.Conn
+       acl            []zk.ACL // We will set this ACL for the node we have 
created
+       leaderMu       sync.RWMutex
+       leaderID       string
+       myID           string
+       electPath      string
+       isReady        atomic.Bool
+       quitCh         chan struct{}
+       leaderChangeCh chan bool
+       wg             sync.WaitGroup
+}
+
+func New(id string, cfg *Config) (*Zookeeper, error) {
+       if len(id) == 0 {
+               return nil, errors.New("id must NOT be a empty string")
+       }
+       conn, _, err := zk.Connect(cfg.Addrs, sessionTTL)
+       if err != nil {
+               return nil, err
+       }
+       electPath := defaultElectPath
+       if cfg.ElectPath != "" {
+               electPath = cfg.ElectPath
+       }
+       acl := zk.WorldACL(zk.PermAll)
+       if cfg.Scheme != "" && cfg.Auth != "" {
+               err := conn.AddAuth(cfg.Scheme, []byte(cfg.Auth))
+               if err == nil {
+                       acl = []zk.ACL{{Perms: zk.PermAll, Scheme: cfg.Scheme, 
ID: cfg.Auth}}
+               } else {
+                       logger.Get().Warn("Zookeeper addAuth fail: " + 
err.Error())
+               }
+       }
+       e := &Zookeeper{
+               myID:           id,
+               acl:            acl,
+               electPath:      electPath,
+               conn:           conn,
+               quitCh:         make(chan struct{}),
+               leaderChangeCh: make(chan bool),
+               wg:             sync.WaitGroup{},
+       }
+       e.isReady.Store(false)
+       e.wg.Add(1)
+       go e.electLoop(context.Background())
+       return e, nil
+}
+
+func (e *Zookeeper) ID() string {
+       return e.myID
+}
+
+func (e *Zookeeper) Leader() string {
+       e.leaderMu.RLock()
+       defer e.leaderMu.RUnlock()
+       return e.leaderID
+}
+
+func (e *Zookeeper) LeaderChange() <-chan bool {
+       return e.leaderChangeCh
+}
+
+func (e *Zookeeper) IsReady(ctx context.Context) bool {
+       for {
+               select {
+               case <-e.quitCh:
+                       return false
+               case <-time.After(100 * time.Millisecond):
+                       if e.isReady.Load() {
+                               return true
+                       }
+               case <-ctx.Done():
+                       return e.isReady.Load()
+               }
+       }
+}
+
+func (e *Zookeeper) Get(ctx context.Context, key string) ([]byte, error) {
+       data, _, err := e.conn.Get(key)
+       if err != nil {
+               if errors.Is(err, zk.ErrNoNode) {
+                       return nil, nil // Key does not exist
+               }
+               return nil, err
+       }
+
+       return data, nil
+}
+
+func (e *Zookeeper) Exists(ctx context.Context, key string) (bool, error) {
+       exists, _, err := e.conn.Exists(key)
+       if err != nil {
+               return false, err
+       }
+       return exists, nil
+}
+
+// If the key exists, it will be set; if not, it will be created.
+func (e *Zookeeper) Set(ctx context.Context, key string, value []byte) error {
+       exist, _ := e.Exists(ctx, key)
+       if exist {
+               _, err := e.conn.Set(key, value, -1)
+               return err
+       }
+
+       return e.Create(ctx, key, value, 0)
+}
+
+func (e *Zookeeper) Create(ctx context.Context, key string, value []byte, 
flags int32) error {
+       lastSlashIndex := strings.LastIndex(key, "/")
+       if lastSlashIndex > 0 {
+               substring := key[:lastSlashIndex]
+               // If the parent node does not exist, create the parent node 
recursively
+               exist, _ := e.Exists(ctx, substring)
+               if !exist {
+                       err := e.Create(ctx, substring, []byte{}, 0)
+                       if err != nil {
+                               return err
+                       }
+               }
+       }
+       _, err := e.conn.Create(key, value, flags, e.acl)
+       return err
+}
+
+func (e *Zookeeper) Delete(ctx context.Context, key string) error {
+       err := e.conn.Delete(key, -1)
+       if errors.Is(err, zk.ErrNoNode) {
+               return nil // Key does not exist
+       }
+       return err
+}
+
+func (e *Zookeeper) List(ctx context.Context, prefix string) 
([]persistence.Entry, error) {
+       children, _, err := e.conn.Children(prefix)
+       if errors.Is(err, zk.ErrNoNode) {
+               return []persistence.Entry{}, nil
+       } else if err != nil {
+               return nil, err
+       }
+
+       entries := make([]persistence.Entry, 0)
+       for _, child := range children {
+               key := prefix + "/" + child
+               data, _, err := e.conn.Get(key)
+               if err != nil {
+                       return nil, err
+               }
+
+               entry := persistence.Entry{
+                       Key:   child,
+                       Value: data,
+               }
+               entries = append(entries, entry)
+       }
+
+       return entries, nil
+}
+
+func (e *Zookeeper) SetleaderID(newLeaderID string) {
+       if newLeaderID != "" && newLeaderID != e.leaderID {
+               if !e.isReady.Load() {
+                       // we set ready flag when leaderID first changed
+                       e.isReady.Store(true)
+               }
+               e.leaderMu.Lock()
+               e.leaderID = newLeaderID
+               e.leaderMu.Unlock()
+               e.leaderChangeCh <- true
+       }
+}
+
+func (e *Zookeeper) electLoop(ctx context.Context) {
+       defer e.wg.Done()
+reset:
+       select {
+       case <-e.quitCh:
+               return
+       default:
+       }
+       err := e.Create(ctx, e.electPath, []byte(e.myID), zk.FlagEphemeral)
+       if err != nil && !errors.Is(err, zk.ErrNodeExists) {
+               time.Sleep(sessionTTL / 3)
+               goto reset
+       }
+       data, _, ch, err := e.conn.GetW(e.electPath)
+       if err != nil {
+               time.Sleep(sessionTTL / 3)
+               goto reset
+       }
+       e.SetleaderID(string(data))
+
+       for {
+               select {
+               case resp := <-ch:
+                       if resp.Type == zk.EventNodeDeleted {
+                               err := e.Create(ctx, e.electPath, 
[]byte(e.myID), zk.FlagEphemeral)
+                               if err != nil && !errors.Is(err, 
zk.ErrNodeExists) {
+                                       time.Sleep(sessionTTL / 3)
+                                       goto reset
+                               }
+                       }
+                       data, _, ch, err = e.conn.GetW(e.electPath)
+                       if err != nil {
+                               time.Sleep(sessionTTL / 3)
+                               goto reset
+                       }
+                       e.SetleaderID(string(data))
+               case <-e.quitCh:
+                       logger.Get().Info(e.myID + " Exit the leader election 
loop")
+                       return
+               }
+
+       }
+
+}
+
+func (e *Zookeeper) Close() error {
+       close(e.quitCh)
+       e.wg.Wait()
+       e.conn.Close()
+       return nil
+}
diff --git a/storage/persistence/zookeeper/zookeeper_test.go 
b/storage/persistence/zookeeper/zookeeper_test.go
new file mode 100644
index 0000000..278a02a
--- /dev/null
+++ b/storage/persistence/zookeeper/zookeeper_test.go
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package zookeeper
+
+import (
+       "context"
+       "testing"
+       "time"
+
+       "github.com/apache/kvrocks-controller/util"
+
+       "github.com/stretchr/testify/require"
+)
+
+const addr = "127.0.0.1:2181"
+
+func TestBasicOperations(t *testing.T) {
+       id := util.RandString(40)
+       testElectPath := "/" + util.RandString(8) + "/" + util.RandString(8)
+       persist, err := New(id, &Config{
+               ElectPath: testElectPath,
+               Addrs:     []string{addr},
+       })
+       require.NoError(t, err)
+       defer persist.Close()
+       go func() {
+               for range persist.LeaderChange() {
+                       // do nothing
+               }
+       }()
+
+       ctx := context.Background()
+       keys := []string{"/a/b/c0", "/a/b/c1", "/a/b/c2"}
+       value := []byte("v")
+       for _, key := range keys {
+               require.NoError(t, persist.Set(ctx, key, value))
+               gotValue, err := persist.Get(ctx, key)
+               require.NoError(t, err)
+               require.Equal(t, value, gotValue)
+       }
+       entries, err := persist.List(ctx, "/a/b")
+       require.NoError(t, err)
+       require.Equal(t, len(keys), len(entries))
+       for _, key := range keys {
+               require.NoError(t, persist.Delete(ctx, key))
+       }
+}
+
+func TestElect(t *testing.T) {
+       endpoints := []string{addr}
+
+       testElectPath := "/" + util.RandString(8) + "/" + util.RandString(8)
+       id0 := util.RandString(40)
+       node0, err := New(id0, &Config{
+               ElectPath: testElectPath,
+               Addrs:     endpoints,
+       })
+       require.NoError(t, err)
+       require.Eventuallyf(t, func() bool {
+               return node0.Leader() == node0.myID
+       }, 10*time.Second, 100*time.Millisecond, "node0 should be the leader")
+
+       id1 := util.RandString(40)
+       node1, err := New(id1, &Config{
+               ElectPath: testElectPath,
+               Addrs:     endpoints,
+       })
+       require.NoError(t, err)
+       require.Eventuallyf(t, func() bool {
+               return node1.Leader() == node0.myID
+       }, 10*time.Second, 100*time.Millisecond, "node1's leader should be the 
node0")
+
+       go func() {
+               for {
+                       select {
+                       case <-node0.LeaderChange():
+                               // do nothing
+                       case <-node1.LeaderChange():
+                               // do nothing
+                       }
+               }
+       }()
+
+       require.NoError(t, node0.Close())
+
+       require.Eventuallyf(t, func() bool {
+               return node1.Leader() == node1.myID
+       }, 15*time.Second, 100*time.Millisecond, "node1 should be the leader")
+       require.NoError(t, node1.Close())
+}

Reply via email to