This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git
The following commit(s) were added to refs/heads/unstable by this push:
new 67e45f6 Fix missing yaml tag in raft config and List API didn't work
correctly (#228)
67e45f6 is described below
commit 67e45f6198167e00a5021236e343b8ca9c9282ed
Author: hulk <[email protected]>
AuthorDate: Tue Dec 10 09:48:55 2024 +0800
Fix missing yaml tag in raft config and List API didn't work correctly
(#228)
---
cmd/server/main.go | 12 ++++---
server/server.go | 11 ++++---
store/engine/raft/config.go | 15 ++++-----
store/engine/raft/logger.go | 41 ++++++++++++++++++++++++
store/engine/raft/node.go | 69 ++++++++++++++++++-----------------------
store/engine/raft/node_test.go | 2 +-
store/engine/raft/store.go | 51 +++++++++++++++++++++++++-----
store/engine/raft/store_test.go | 26 +++++++++++-----
8 files changed, 155 insertions(+), 72 deletions(-)
diff --git a/cmd/server/main.go b/cmd/server/main.go
index 461107b..614f184 100644
--- a/cmd/server/main.go
+++ b/cmd/server/main.go
@@ -41,13 +41,13 @@ func init() {
flag.StringVar(&configPath, "c", "config/config.yaml", "set config yaml
file path")
}
-func registerSignal(shutdown chan struct{}) {
+func registerSignal(closeFn func()) {
c := make(chan os.Signal, 1)
signal.Notify(c, []os.Signal{syscall.SIGHUP, syscall.SIGINT,
syscall.SIGTERM, syscall.SIGUSR1}...)
go func() {
for sig := range c {
if handleSignals(sig) {
- close(shutdown)
+ closeFn()
return
}
}
@@ -65,10 +65,13 @@ func handleSignals(sig os.Signal) (exitNow bool) {
}
func main() {
+ ctx, cancelFn := context.WithCancel(context.Background())
// os signal handler
shutdownCh := make(chan struct{})
- registerSignal(shutdownCh)
- ctx, cancelFn := context.WithCancel(context.Background())
+ registerSignal(func() {
+ close(shutdownCh)
+ cancelFn()
+ })
flag.Parse()
@@ -101,7 +104,6 @@ func main() {
// wait for the term signal
<-shutdownCh
- cancelFn()
if err := srv.Stop(); err != nil {
logger.Get().With(zap.Error(err)).Error("Failed to close the
server")
} else {
diff --git a/server/server.go b/server/server.go
index fc99e43..f5f6d82 100644
--- a/server/server.go
+++ b/server/server.go
@@ -22,6 +22,7 @@ package server
import (
"context"
+ "errors"
"fmt"
"net/http"
"net/http/pprof"
@@ -77,11 +78,8 @@ func NewServer(cfg *config.Config) (*Server, error) {
if persist == nil {
return nil, fmt.Errorf("no found any store config")
}
- clusterStore := store.NewClusterStore(persist)
- if ok := clusterStore.IsReady(context.Background()); !ok {
- return nil, fmt.Errorf("the cluster store is not ready")
- }
+ clusterStore := store.NewClusterStore(persist)
ctrl, err := controller.New(clusterStore, cfg.Controller)
if err != nil {
return nil, err
@@ -103,7 +101,7 @@ func (srv *Server) startAPIServer() {
}
go func() {
if err := httpServer.ListenAndServe(); err != nil {
- if err == http.ErrServerClosed {
+ if errors.Is(err, http.ErrServerClosed) {
return
}
panic(fmt.Errorf("API server: %w", err))
@@ -128,6 +126,9 @@ func PProf(c *gin.Context) {
}
func (srv *Server) Start(ctx context.Context) error {
+ if ok := srv.store.IsReady(ctx); !ok {
+ return fmt.Errorf("the cluster store is not ready")
+ }
if err := srv.controller.Start(ctx); err != nil {
return err
}
diff --git a/store/engine/raft/config.go b/store/engine/raft/config.go
index 3ddf059..7a3b52e 100644
--- a/store/engine/raft/config.go
+++ b/store/engine/raft/config.go
@@ -24,16 +24,17 @@ import "errors"
type Config struct {
// ID is the identity of the local raft. ID cannot be 0.
- ID uint64
+ ID uint64 `yaml:"id"`
// DataDir is the directory to store the raft data which includes
snapshot and WALs.
- DataDir string
+ DataDir string `yaml:"data_dir"`
// Join should be set to true if the node is joining an existing
cluster.
- Join bool
-
+ Join bool `yaml:"join"`
// Peers is the list of raft peers.
- Peers []string
- HeartbeatSeconds int
- ElectionSeconds int
+ Peers []string `yaml:"peers"`
+ // HeartbeatSeconds is the interval to send heartbeat message. Default
is 2 seconds.
+ HeartbeatSeconds int `yaml:"heartbeat_seconds"`
+ // ElectionSeconds is the interval to start an election. Default is 10
* HeartBeat.
+ ElectionSeconds int `yaml:"election_seconds"`
}
func (c *Config) validate() error {
diff --git a/store/engine/raft/logger.go b/store/engine/raft/logger.go
new file mode 100644
index 0000000..db61748
--- /dev/null
+++ b/store/engine/raft/logger.go
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package raft
+
+import (
+ "go.etcd.io/etcd/raft/v3"
+ "go.uber.org/zap"
+)
+
+var _ raft.Logger = &Logger{}
+
+// Logger is a wrapper around zap.SugaredLogger to implement the raft.Logger
interface.
+type Logger struct {
+ *zap.SugaredLogger
+}
+
+func (r Logger) Warning(v ...interface{}) {
+ r.SugaredLogger.Warn(v...)
+}
+
+func (r Logger) Warningf(format string, v ...interface{}) {
+ r.SugaredLogger.Warnf(format, v...)
+}
diff --git a/store/engine/raft/node.go b/store/engine/raft/node.go
index 3eed408..dcfa859 100644
--- a/store/engine/raft/node.go
+++ b/store/engine/raft/node.go
@@ -152,13 +152,18 @@ func (n *Node) run() error {
MaxInflightMsgs: 128,
MaxSizePerMsg: 10 * 1024 * 1024, // 10 MiB
Storage: n.dataStore.raftStorage,
+ Logger: Logger{SugaredLogger: n.logger.Sugar()},
}
// WAL existing check must be done before replayWAL since it will
create a new WAL if not exists
walExists := n.dataStore.walExists()
- if err := n.dataStore.replayWAL(); err != nil {
+ snapshot, err := n.dataStore.replayWAL()
+ if err != nil {
return err
}
+ n.appliedIndex = snapshot.Metadata.Index
+ n.snapshotIndex = snapshot.Metadata.Index
+ n.confState = snapshot.Metadata.ConfState
if n.config.Join || walExists {
n.raftNode = raft.RestartNode(raftConfig)
@@ -220,19 +225,6 @@ func (n *Node) runTransport() error {
}
func (n *Node) runRaftMessages() error {
- snapshot, err := n.dataStore.loadSnapshotFromDisk()
- if err != nil {
- return err
- }
-
- // Load the snapshot into the key-value store.
- if err := n.dataStore.reloadSnapshot(); err != nil {
- return err
- }
- n.appliedIndex = snapshot.Metadata.Index
- n.snapshotIndex = snapshot.Metadata.Index
- n.confState = snapshot.Metadata.ConfState
-
n.wg.Add(1)
go func() {
ticker := time.NewTicker(100 * time.Millisecond)
@@ -357,8 +349,28 @@ func (n *Node) GetRaftLead() uint64 {
return n.raftNode.Status().Lead
}
-func (n *Node) IsReady(_ context.Context) bool {
- return n.raftNode.Status().Lead != raft.None
+func (n *Node) IsReady(ctx context.Context) bool {
+ tries := 0
+ for {
+ select {
+ case <-n.shutdown:
+ return false
+ case <-time.After(200 * time.Millisecond):
+ // wait for the leader to be elected
+ if n.GetRaftLead() != raft.None {
+ return true
+ }
+
+ tries++
+ if tries >= 10 {
+ // waiting too long, just return the running
status
+ n.logger.Warn("Leader not elected, return the
running status")
+ return n.isRunning.Load()
+ }
+ case <-ctx.Done():
+ return false
+ }
+ }
}
func (n *Node) LeaderChange() <-chan bool {
@@ -392,8 +404,7 @@ func (n *Node) Delete(ctx context.Context, key string)
error {
}
func (n *Node) List(_ context.Context, prefix string) ([]engine.Entry, error) {
- n.dataStore.List(prefix)
- return nil, nil
+ return n.dataStore.List(prefix), nil
}
func (n *Node) applySnapshot(snapshot raftpb.Snapshot) error {
@@ -437,27 +448,7 @@ func (n *Node) applyEntries(entries []raftpb.Entry) {
func (n *Node) applyEntry(entry raftpb.Entry) error {
switch entry.Type {
case raftpb.EntryNormal:
- // apply entry to the state machine
- if len(entry.Data) == 0 {
- // empty message, skip it.
- return nil
- }
-
- var e Event
- if err := json.Unmarshal(entry.Data, &e); err != nil {
- return err
- }
- switch e.Op {
- case opSet:
- n.dataStore.Set(e.Key, e.Value)
- return nil
- case opDelete:
- n.dataStore.Delete(e.Key)
- case opGet:
- // do nothing
- default:
- return fmt.Errorf("unknown operation type: %d", e.Op)
- }
+ return n.dataStore.applyDataEntry(entry)
case raftpb.EntryConfChangeV2, raftpb.EntryConfChange:
// apply config change to the state machine
var cc raftpb.ConfChange
diff --git a/store/engine/raft/node_test.go b/store/engine/raft/node_test.go
index cb60cb0..1960161 100644
--- a/store/engine/raft/node_test.go
+++ b/store/engine/raft/node_test.go
@@ -185,7 +185,7 @@ func TestCluster_MultiNodes(t *testing.T) {
require.Eventually(t, func() bool {
got, _ := n2.Get(ctx, "foo")
return string(got) == "bar"
- }, 1*time.Second, 100*time.Millisecond)
+ }, 10*time.Second, 100*time.Millisecond)
})
t.Run("works well if 1/3 nodes down", func(t *testing.T) {
diff --git a/store/engine/raft/store.go b/store/engine/raft/store.go
index 4834a31..fd3f773 100644
--- a/store/engine/raft/store.go
+++ b/store/engine/raft/store.go
@@ -25,6 +25,7 @@ import (
"errors"
"fmt"
"os"
+ "slices"
"strings"
"sync"
@@ -135,32 +136,41 @@ func (ds *DataStore) openWAL(snapshot *raftpb.Snapshot)
(*wal.WAL, error) {
return wal.Open(logger.Get(), ds.walDir, walSnapshot)
}
-func (ds *DataStore) replayWAL() error {
+func (ds *DataStore) replayWAL() (*raftpb.Snapshot, error) {
snapshot, err := ds.loadSnapshotFromDisk()
if err != nil {
- return fmt.Errorf("failed to load newest snapshot: %w", err)
+ return nil, fmt.Errorf("failed to load newest snapshot: %w",
err)
}
w, err := ds.openWAL(snapshot)
if err != nil {
- return fmt.Errorf("failed to open WAL: %w", err)
+ return nil, fmt.Errorf("failed to open WAL: %w", err)
}
ds.wal = w
_, hardState, entries, err := w.ReadAll()
if err != nil {
- return fmt.Errorf("failed to read WAL: %w", err)
+ return nil, fmt.Errorf("failed to read WAL: %w", err)
}
if snapshot != nil {
_ = ds.raftStorage.ApplySnapshot(*snapshot)
}
if err := ds.raftStorage.SetHardState(hardState); err != nil {
- return fmt.Errorf("failed to set hard state: %w", err)
+ return nil, fmt.Errorf("failed to set hard state: %w", err)
}
if err := ds.raftStorage.Append(entries); err != nil {
- return fmt.Errorf("failed to append entries: %w", err)
+ return nil, fmt.Errorf("failed to append entries: %w", err)
}
- return nil
+
+ if err := ds.reloadSnapshot(); err != nil {
+ return nil, fmt.Errorf("failed to reload snapshot: %w", err)
+ }
+ for _, entry := range entries {
+ if err := ds.applyDataEntry(entry); err != nil {
+ return nil, fmt.Errorf("failed to apply data entry:
%w", err)
+ }
+ }
+ return snapshot, nil
}
func (ds *DataStore) saveSnapshot(snapshot raftpb.Snapshot) error {
@@ -178,6 +188,28 @@ func (ds *DataStore) saveSnapshot(snapshot
raftpb.Snapshot) error {
return ds.wal.ReleaseLockTo(snapshot.Metadata.Index)
}
+func (ds *DataStore) applyDataEntry(entry raftpb.Entry) error {
+ if entry.Type != raftpb.EntryNormal || len(entry.Data) == 0 {
+ return nil
+ }
+
+ var e Event
+ if err := json.Unmarshal(entry.Data, &e); err != nil {
+ return err
+ }
+ switch e.Op {
+ case opSet:
+ ds.Set(e.Key, e.Value)
+ case opDelete:
+ ds.Delete(e.Key)
+ case opGet:
+ // do nothing
+ default:
+ return fmt.Errorf("unknown operation type: %d", e.Op)
+ }
+ return nil
+}
+
func (ds *DataStore) Set(key string, value []byte) {
ds.mu.Lock()
defer ds.mu.Unlock()
@@ -206,11 +238,14 @@ func (ds *DataStore) List(prefix string) []engine.Entry {
for k := range ds.kvs {
if strings.HasPrefix(k, prefix) {
entries = append(entries, engine.Entry{
- Key: strings.TrimPrefix(k, prefix),
+ Key: strings.TrimLeft(strings.TrimPrefix(k,
prefix), "/"),
Value: ds.kvs[k],
})
}
}
+ slices.SortFunc(entries, func(i, j engine.Entry) int {
+ return strings.Compare(i.Key, j.Key)
+ })
return entries
}
diff --git a/store/engine/raft/store_test.go b/store/engine/raft/store_test.go
index e16e2ae..de378dd 100644
--- a/store/engine/raft/store_test.go
+++ b/store/engine/raft/store_test.go
@@ -21,6 +21,8 @@
package raft
import (
+ "encoding/json"
+ "fmt"
"os"
"testing"
@@ -38,19 +40,23 @@ func TestDataStore(t *testing.T) {
os.RemoveAll(dir)
}()
- err := store.replayWAL()
+ _, err := store.replayWAL()
require.NoError(t, err)
t.Run("reply WAL from the disk", func(t *testing.T) {
- require.NoError(t, store.wal.Save(raftpb.HardState{Term: 1,
Vote: 1}, []raftpb.Entry{
- {Term: 1, Index: 1, Type: raftpb.EntryNormal, Data:
[]byte("test-1")},
- {Term: 1, Index: 2, Type: raftpb.EntryNormal, Data:
[]byte("test-2")},
- {Term: 1, Index: 3, Type: raftpb.EntryNormal, Data:
[]byte("test-3")},
- }))
+ entries := make([]raftpb.Entry, 0)
+ for i := 0; i < 3; i++ {
+ payload, err := json.Marshal(Event{Op: opSet, Key:
fmt.Sprintf("key-%d", i), Value: []byte(fmt.Sprintf("value-%d", i))})
+ require.NoError(t, err)
+ entries = append(entries, raftpb.Entry{Term: 1, Index:
uint64(i + 1), Type: raftpb.EntryNormal, Data: payload})
+ }
+ require.NoError(t, store.wal.Save(raftpb.HardState{Term: 1,
Vote: 1}, entries))
store.Close()
store = NewDataStore(dir)
- require.NoError(t, store.replayWAL())
+ snapshot, err := store.replayWAL()
+ require.NoError(t, err)
+ require.NotNil(t, snapshot)
firstIndex, err := store.raftStorage.FirstIndex()
require.NoError(t, err)
@@ -63,6 +69,12 @@ func TestDataStore(t *testing.T) {
term, err := store.raftStorage.Term(1)
require.NoError(t, err)
require.EqualValues(t, 1, term)
+
+ for i := 0; i < 3; i++ {
+ v, err := store.Get(fmt.Sprintf("key-%d", i))
+ require.NoError(t, err)
+ require.Equal(t, []byte(fmt.Sprintf("value-%d", i)), v)
+ }
})
t.Run("Basic GET/SET/DELETE/LIST", func(t *testing.T) {