This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 67e45f6  Fix missing yaml tag in raft config and List API didn't work 
correctly (#228)
67e45f6 is described below

commit 67e45f6198167e00a5021236e343b8ca9c9282ed
Author: hulk <[email protected]>
AuthorDate: Tue Dec 10 09:48:55 2024 +0800

    Fix missing yaml tag in raft config and List API didn't work correctly 
(#228)
---
 cmd/server/main.go              | 12 ++++---
 server/server.go                | 11 ++++---
 store/engine/raft/config.go     | 15 ++++-----
 store/engine/raft/logger.go     | 41 ++++++++++++++++++++++++
 store/engine/raft/node.go       | 69 ++++++++++++++++++-----------------------
 store/engine/raft/node_test.go  |  2 +-
 store/engine/raft/store.go      | 51 +++++++++++++++++++++++++-----
 store/engine/raft/store_test.go | 26 +++++++++++-----
 8 files changed, 155 insertions(+), 72 deletions(-)

diff --git a/cmd/server/main.go b/cmd/server/main.go
index 461107b..614f184 100644
--- a/cmd/server/main.go
+++ b/cmd/server/main.go
@@ -41,13 +41,13 @@ func init() {
        flag.StringVar(&configPath, "c", "config/config.yaml", "set config yaml 
file path")
 }
 
-func registerSignal(shutdown chan struct{}) {
+func registerSignal(closeFn func()) {
        c := make(chan os.Signal, 1)
        signal.Notify(c, []os.Signal{syscall.SIGHUP, syscall.SIGINT, 
syscall.SIGTERM, syscall.SIGUSR1}...)
        go func() {
                for sig := range c {
                        if handleSignals(sig) {
-                               close(shutdown)
+                               closeFn()
                                return
                        }
                }
@@ -65,10 +65,13 @@ func handleSignals(sig os.Signal) (exitNow bool) {
 }
 
 func main() {
+       ctx, cancelFn := context.WithCancel(context.Background())
        // os signal handler
        shutdownCh := make(chan struct{})
-       registerSignal(shutdownCh)
-       ctx, cancelFn := context.WithCancel(context.Background())
+       registerSignal(func() {
+               close(shutdownCh)
+               cancelFn()
+       })
 
        flag.Parse()
 
@@ -101,7 +104,6 @@ func main() {
 
        // wait for the term signal
        <-shutdownCh
-       cancelFn()
        if err := srv.Stop(); err != nil {
                logger.Get().With(zap.Error(err)).Error("Failed to close the 
server")
        } else {
diff --git a/server/server.go b/server/server.go
index fc99e43..f5f6d82 100644
--- a/server/server.go
+++ b/server/server.go
@@ -22,6 +22,7 @@ package server
 
 import (
        "context"
+       "errors"
        "fmt"
        "net/http"
        "net/http/pprof"
@@ -77,11 +78,8 @@ func NewServer(cfg *config.Config) (*Server, error) {
        if persist == nil {
                return nil, fmt.Errorf("no found any store config")
        }
-       clusterStore := store.NewClusterStore(persist)
-       if ok := clusterStore.IsReady(context.Background()); !ok {
-               return nil, fmt.Errorf("the cluster store is not ready")
-       }
 
+       clusterStore := store.NewClusterStore(persist)
        ctrl, err := controller.New(clusterStore, cfg.Controller)
        if err != nil {
                return nil, err
@@ -103,7 +101,7 @@ func (srv *Server) startAPIServer() {
        }
        go func() {
                if err := httpServer.ListenAndServe(); err != nil {
-                       if err == http.ErrServerClosed {
+                       if errors.Is(err, http.ErrServerClosed) {
                                return
                        }
                        panic(fmt.Errorf("API server: %w", err))
@@ -128,6 +126,9 @@ func PProf(c *gin.Context) {
 }
 
 func (srv *Server) Start(ctx context.Context) error {
+       if ok := srv.store.IsReady(ctx); !ok {
+               return fmt.Errorf("the cluster store is not ready")
+       }
        if err := srv.controller.Start(ctx); err != nil {
                return err
        }
diff --git a/store/engine/raft/config.go b/store/engine/raft/config.go
index 3ddf059..7a3b52e 100644
--- a/store/engine/raft/config.go
+++ b/store/engine/raft/config.go
@@ -24,16 +24,17 @@ import "errors"
 
 type Config struct {
        // ID is the identity of the local raft. ID cannot be 0.
-       ID uint64
+       ID uint64 `yaml:"id"`
        // DataDir is the directory to store the raft data which includes 
snapshot and WALs.
-       DataDir string
+       DataDir string `yaml:"data_dir"`
        // Join should be set to true if the node is joining an existing 
cluster.
-       Join bool
-
+       Join bool `yaml:"join"`
        // Peers is the list of raft peers.
-       Peers            []string
-       HeartbeatSeconds int
-       ElectionSeconds  int
+       Peers []string `yaml:"peers"`
+       // HeartbeatSeconds is the interval to send heartbeat message. Default 
is 2 seconds.
+       HeartbeatSeconds int `yaml:"heartbeat_seconds"`
+       // ElectionSeconds is the interval to start an election. Default is 10 
* HeartBeat.
+       ElectionSeconds int `yaml:"election_seconds"`
 }
 
 func (c *Config) validate() error {
diff --git a/store/engine/raft/logger.go b/store/engine/raft/logger.go
new file mode 100644
index 0000000..db61748
--- /dev/null
+++ b/store/engine/raft/logger.go
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package raft
+
+import (
+       "go.etcd.io/etcd/raft/v3"
+       "go.uber.org/zap"
+)
+
+var _ raft.Logger = &Logger{}
+
+// Logger is a wrapper around zap.SugaredLogger to implement the raft.Logger 
interface.
+type Logger struct {
+       *zap.SugaredLogger
+}
+
+func (r Logger) Warning(v ...interface{}) {
+       r.SugaredLogger.Warn(v...)
+}
+
+func (r Logger) Warningf(format string, v ...interface{}) {
+       r.SugaredLogger.Warnf(format, v...)
+}
diff --git a/store/engine/raft/node.go b/store/engine/raft/node.go
index 3eed408..dcfa859 100644
--- a/store/engine/raft/node.go
+++ b/store/engine/raft/node.go
@@ -152,13 +152,18 @@ func (n *Node) run() error {
                MaxInflightMsgs: 128,
                MaxSizePerMsg:   10 * 1024 * 1024, // 10 MiB
                Storage:         n.dataStore.raftStorage,
+               Logger:          Logger{SugaredLogger: n.logger.Sugar()},
        }
 
        // WAL existing check must be done before replayWAL since it will 
create a new WAL if not exists
        walExists := n.dataStore.walExists()
-       if err := n.dataStore.replayWAL(); err != nil {
+       snapshot, err := n.dataStore.replayWAL()
+       if err != nil {
                return err
        }
+       n.appliedIndex = snapshot.Metadata.Index
+       n.snapshotIndex = snapshot.Metadata.Index
+       n.confState = snapshot.Metadata.ConfState
 
        if n.config.Join || walExists {
                n.raftNode = raft.RestartNode(raftConfig)
@@ -220,19 +225,6 @@ func (n *Node) runTransport() error {
 }
 
 func (n *Node) runRaftMessages() error {
-       snapshot, err := n.dataStore.loadSnapshotFromDisk()
-       if err != nil {
-               return err
-       }
-
-       // Load the snapshot into the key-value store.
-       if err := n.dataStore.reloadSnapshot(); err != nil {
-               return err
-       }
-       n.appliedIndex = snapshot.Metadata.Index
-       n.snapshotIndex = snapshot.Metadata.Index
-       n.confState = snapshot.Metadata.ConfState
-
        n.wg.Add(1)
        go func() {
                ticker := time.NewTicker(100 * time.Millisecond)
@@ -357,8 +349,28 @@ func (n *Node) GetRaftLead() uint64 {
        return n.raftNode.Status().Lead
 }
 
-func (n *Node) IsReady(_ context.Context) bool {
-       return n.raftNode.Status().Lead != raft.None
+func (n *Node) IsReady(ctx context.Context) bool {
+       tries := 0
+       for {
+               select {
+               case <-n.shutdown:
+                       return false
+               case <-time.After(200 * time.Millisecond):
+                       // wait for the leader to be elected
+                       if n.GetRaftLead() != raft.None {
+                               return true
+                       }
+
+                       tries++
+                       if tries >= 10 {
+                               // waiting too long, just return the running 
status
+                               n.logger.Warn("Leader not elected, return the 
running status")
+                               return n.isRunning.Load()
+                       }
+               case <-ctx.Done():
+                       return false
+               }
+       }
 }
 
 func (n *Node) LeaderChange() <-chan bool {
@@ -392,8 +404,7 @@ func (n *Node) Delete(ctx context.Context, key string) 
error {
 }
 
 func (n *Node) List(_ context.Context, prefix string) ([]engine.Entry, error) {
-       n.dataStore.List(prefix)
-       return nil, nil
+       return n.dataStore.List(prefix), nil
 }
 
 func (n *Node) applySnapshot(snapshot raftpb.Snapshot) error {
@@ -437,27 +448,7 @@ func (n *Node) applyEntries(entries []raftpb.Entry) {
 func (n *Node) applyEntry(entry raftpb.Entry) error {
        switch entry.Type {
        case raftpb.EntryNormal:
-               // apply entry to the state machine
-               if len(entry.Data) == 0 {
-                       // empty message, skip it.
-                       return nil
-               }
-
-               var e Event
-               if err := json.Unmarshal(entry.Data, &e); err != nil {
-                       return err
-               }
-               switch e.Op {
-               case opSet:
-                       n.dataStore.Set(e.Key, e.Value)
-                       return nil
-               case opDelete:
-                       n.dataStore.Delete(e.Key)
-               case opGet:
-                       // do nothing
-               default:
-                       return fmt.Errorf("unknown operation type: %d", e.Op)
-               }
+               return n.dataStore.applyDataEntry(entry)
        case raftpb.EntryConfChangeV2, raftpb.EntryConfChange:
                // apply config change to the state machine
                var cc raftpb.ConfChange
diff --git a/store/engine/raft/node_test.go b/store/engine/raft/node_test.go
index cb60cb0..1960161 100644
--- a/store/engine/raft/node_test.go
+++ b/store/engine/raft/node_test.go
@@ -185,7 +185,7 @@ func TestCluster_MultiNodes(t *testing.T) {
                require.Eventually(t, func() bool {
                        got, _ := n2.Get(ctx, "foo")
                        return string(got) == "bar"
-               }, 1*time.Second, 100*time.Millisecond)
+               }, 10*time.Second, 100*time.Millisecond)
        })
 
        t.Run("works well if 1/3 nodes down", func(t *testing.T) {
diff --git a/store/engine/raft/store.go b/store/engine/raft/store.go
index 4834a31..fd3f773 100644
--- a/store/engine/raft/store.go
+++ b/store/engine/raft/store.go
@@ -25,6 +25,7 @@ import (
        "errors"
        "fmt"
        "os"
+       "slices"
        "strings"
        "sync"
 
@@ -135,32 +136,41 @@ func (ds *DataStore) openWAL(snapshot *raftpb.Snapshot) 
(*wal.WAL, error) {
        return wal.Open(logger.Get(), ds.walDir, walSnapshot)
 }
 
-func (ds *DataStore) replayWAL() error {
+func (ds *DataStore) replayWAL() (*raftpb.Snapshot, error) {
        snapshot, err := ds.loadSnapshotFromDisk()
        if err != nil {
-               return fmt.Errorf("failed to load newest snapshot: %w", err)
+               return nil, fmt.Errorf("failed to load newest snapshot: %w", 
err)
        }
 
        w, err := ds.openWAL(snapshot)
        if err != nil {
-               return fmt.Errorf("failed to open WAL: %w", err)
+               return nil, fmt.Errorf("failed to open WAL: %w", err)
        }
        ds.wal = w
 
        _, hardState, entries, err := w.ReadAll()
        if err != nil {
-               return fmt.Errorf("failed to read WAL: %w", err)
+               return nil, fmt.Errorf("failed to read WAL: %w", err)
        }
        if snapshot != nil {
                _ = ds.raftStorage.ApplySnapshot(*snapshot)
        }
        if err := ds.raftStorage.SetHardState(hardState); err != nil {
-               return fmt.Errorf("failed to set hard state: %w", err)
+               return nil, fmt.Errorf("failed to set hard state: %w", err)
        }
        if err := ds.raftStorage.Append(entries); err != nil {
-               return fmt.Errorf("failed to append entries: %w", err)
+               return nil, fmt.Errorf("failed to append entries: %w", err)
        }
-       return nil
+
+       if err := ds.reloadSnapshot(); err != nil {
+               return nil, fmt.Errorf("failed to reload snapshot: %w", err)
+       }
+       for _, entry := range entries {
+               if err := ds.applyDataEntry(entry); err != nil {
+                       return nil, fmt.Errorf("failed to apply data entry: 
%w", err)
+               }
+       }
+       return snapshot, nil
 }
 
 func (ds *DataStore) saveSnapshot(snapshot raftpb.Snapshot) error {
@@ -178,6 +188,28 @@ func (ds *DataStore) saveSnapshot(snapshot 
raftpb.Snapshot) error {
        return ds.wal.ReleaseLockTo(snapshot.Metadata.Index)
 }
 
+func (ds *DataStore) applyDataEntry(entry raftpb.Entry) error {
+       if entry.Type != raftpb.EntryNormal || len(entry.Data) == 0 {
+               return nil
+       }
+
+       var e Event
+       if err := json.Unmarshal(entry.Data, &e); err != nil {
+               return err
+       }
+       switch e.Op {
+       case opSet:
+               ds.Set(e.Key, e.Value)
+       case opDelete:
+               ds.Delete(e.Key)
+       case opGet:
+               // do nothing
+       default:
+               return fmt.Errorf("unknown operation type: %d", e.Op)
+       }
+       return nil
+}
+
 func (ds *DataStore) Set(key string, value []byte) {
        ds.mu.Lock()
        defer ds.mu.Unlock()
@@ -206,11 +238,14 @@ func (ds *DataStore) List(prefix string) []engine.Entry {
        for k := range ds.kvs {
                if strings.HasPrefix(k, prefix) {
                        entries = append(entries, engine.Entry{
-                               Key:   strings.TrimPrefix(k, prefix),
+                               Key:   strings.TrimLeft(strings.TrimPrefix(k, 
prefix), "/"),
                                Value: ds.kvs[k],
                        })
                }
        }
+       slices.SortFunc(entries, func(i, j engine.Entry) int {
+               return strings.Compare(i.Key, j.Key)
+       })
        return entries
 }
 
diff --git a/store/engine/raft/store_test.go b/store/engine/raft/store_test.go
index e16e2ae..de378dd 100644
--- a/store/engine/raft/store_test.go
+++ b/store/engine/raft/store_test.go
@@ -21,6 +21,8 @@
 package raft
 
 import (
+       "encoding/json"
+       "fmt"
        "os"
        "testing"
 
@@ -38,19 +40,23 @@ func TestDataStore(t *testing.T) {
                os.RemoveAll(dir)
        }()
 
-       err := store.replayWAL()
+       _, err := store.replayWAL()
        require.NoError(t, err)
 
        t.Run("reply WAL from the disk", func(t *testing.T) {
-               require.NoError(t, store.wal.Save(raftpb.HardState{Term: 1, 
Vote: 1}, []raftpb.Entry{
-                       {Term: 1, Index: 1, Type: raftpb.EntryNormal, Data: 
[]byte("test-1")},
-                       {Term: 1, Index: 2, Type: raftpb.EntryNormal, Data: 
[]byte("test-2")},
-                       {Term: 1, Index: 3, Type: raftpb.EntryNormal, Data: 
[]byte("test-3")},
-               }))
+               entries := make([]raftpb.Entry, 0)
+               for i := 0; i < 3; i++ {
+                       payload, err := json.Marshal(Event{Op: opSet, Key: 
fmt.Sprintf("key-%d", i), Value: []byte(fmt.Sprintf("value-%d", i))})
+                       require.NoError(t, err)
+                       entries = append(entries, raftpb.Entry{Term: 1, Index: 
uint64(i + 1), Type: raftpb.EntryNormal, Data: payload})
+               }
+               require.NoError(t, store.wal.Save(raftpb.HardState{Term: 1, 
Vote: 1}, entries))
                store.Close()
 
                store = NewDataStore(dir)
-               require.NoError(t, store.replayWAL())
+               snapshot, err := store.replayWAL()
+               require.NoError(t, err)
+               require.NotNil(t, snapshot)
 
                firstIndex, err := store.raftStorage.FirstIndex()
                require.NoError(t, err)
@@ -63,6 +69,12 @@ func TestDataStore(t *testing.T) {
                term, err := store.raftStorage.Term(1)
                require.NoError(t, err)
                require.EqualValues(t, 1, term)
+
+               for i := 0; i < 3; i++ {
+                       v, err := store.Get(fmt.Sprintf("key-%d", i))
+                       require.NoError(t, err)
+                       require.Equal(t, []byte(fmt.Sprintf("value-%d", i)), v)
+               }
        })
 
        t.Run("Basic GET/SET/DELETE/LIST", func(t *testing.T) {

Reply via email to