This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git
The following commit(s) were added to refs/heads/unstable by this push:
new dd7a8c7 Fix data race when updating the raft snapshot and compact
threshold (#253)
dd7a8c7 is described below
commit dd7a8c780049390db8c860274cd2213b2b7635a1
Author: hulk <[email protected]>
AuthorDate: Tue Jan 7 20:38:02 2025 +0800
Fix data race when updating the raft snapshot and compact threshold (#253)
---
store/engine/raft/node.go | 31 ++++++++++++++-----------------
1 file changed, 14 insertions(+), 17 deletions(-)
diff --git a/store/engine/raft/node.go b/store/engine/raft/node.go
index 70a7069..c5c9092 100644
--- a/store/engine/raft/node.go
+++ b/store/engine/raft/node.go
@@ -73,13 +73,12 @@ type Node struct {
logger *zap.Logger
peers sync.Map
- mu sync.Mutex
leader uint64
appliedIndex uint64
snapshotIndex uint64
confState raftpb.ConfState
- snapshotThreshold uint64
- compactThreshold uint64
+ snapshotThreshold atomic.Uint64
+ compactThreshold atomic.Uint64
wg sync.WaitGroup
shutdown chan struct{}
@@ -97,14 +96,14 @@ func New(config *Config) (*Node, error) {
logger := logger.Get().With(zap.Uint64("node_id", config.ID))
n := &Node{
- config: config,
- leader: raft.None,
- dataStore: NewDataStore(config.DataDir),
- leaderChanged: make(chan bool),
- snapshotThreshold: defaultSnapshotThreshold,
- compactThreshold: defaultCompactThreshold,
- logger: logger,
- }
+ config: config,
+ leader: raft.None,
+ dataStore: NewDataStore(config.DataDir),
+ leaderChanged: make(chan bool),
+ logger: logger,
+ }
+ n.snapshotThreshold.Store(defaultSnapshotThreshold)
+ n.compactThreshold.Store(defaultCompactThreshold)
if err := n.run(); err != nil {
return nil, err
}
@@ -127,9 +126,7 @@ func (n *Node) ListPeers() map[uint64]string {
}
func (n *Node) SetSnapshotThreshold(threshold uint64) {
- n.mu.Lock()
- defer n.mu.Unlock()
- n.snapshotThreshold = threshold
+ n.snapshotThreshold.Store(threshold)
}
func (n *Node) run() error {
@@ -309,7 +306,7 @@ func (n *Node) runRaftMessages() error {
}
func (n *Node) triggerSnapshotIfNeed() error {
- if n.appliedIndex-n.snapshotIndex <= n.snapshotThreshold {
+ if n.appliedIndex-n.snapshotIndex <= n.snapshotThreshold.Load() {
return nil
}
snapshotBytes, err := n.dataStore.GetDataStoreSnapshot()
@@ -325,8 +322,8 @@ func (n *Node) triggerSnapshotIfNeed() error {
}
compactIndex := uint64(1)
- if n.appliedIndex > n.compactThreshold {
- compactIndex = n.appliedIndex - n.compactThreshold
+ if n.appliedIndex > n.compactThreshold.Load() {
+ compactIndex = n.appliedIndex - n.compactThreshold.Load()
}
if err := n.dataStore.raftStorage.Compact(compactIndex); err != nil &&
!errors.Is(err, raft.ErrCompacted) {
return err