This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git
The following commit(s) were added to refs/heads/unstable by this push:
new 7644f77 Use sync/atomic instead of Uber's implementation (#256)
7644f77 is described below
commit 7644f7763b39fb46748f9d15a6b5e9abdea04447
Author: Yaroslav <[email protected]>
AuthorDate: Tue Jan 14 14:59:14 2025 +0200
Use sync/atomic instead of Uber's implementation (#256)
---
controller/cluster_test.go | 9 ++++---
controller/controller.go | 6 ++---
go.mod | 1 -
go.sum | 8 ------
server/api/cluster_test.go | 2 +-
server/api/shard_test.go | 14 ++++++-----
store/cluster.go | 50 +++++++++++++++++++++++++++++++------
store/engine/etcd/etcd.go | 3 ++-
store/engine/raft/node.go | 7 +++---
store/engine/zookeeper/zookeeper.go | 10 ++++----
store/store.go | 2 +-
store/store_test.go | 2 +-
12 files changed, 71 insertions(+), 43 deletions(-)
diff --git a/controller/cluster_test.go b/controller/cluster_test.go
index 65a8ac6..2fd7db7 100644
--- a/controller/cluster_test.go
+++ b/controller/cluster_test.go
@@ -27,7 +27,6 @@ import (
"time"
"github.com/stretchr/testify/require"
- "go.uber.org/atomic"
"github.com/apache/kvrocks-controller/consts"
"github.com/apache/kvrocks-controller/store"
@@ -66,7 +65,7 @@ func (s *MockClusterStore) GetCluster(ctx context.Context,
ns, cluster string) (
}
func (s *MockClusterStore) UpdateCluster(ctx context.Context, ns string,
cluster *store.Cluster) error {
- cluster.Version.Inc()
+ cluster.Version.Add(1)
return s.SetCluster(ctx, ns, cluster)
}
@@ -104,8 +103,7 @@ func TestCluster_FailureCount(t *testing.T) {
mockNode3.Sequence = 101
clusterInfo := &store.Cluster{
- Name: clusterName,
- Version: *atomic.NewInt64(1),
+ Name: clusterName,
Shards: []*store.Shard{{
Nodes: []store.Node{
mockNode0, mockNode1, mockNode2, mockNode3,
@@ -115,7 +113,10 @@ func TestCluster_FailureCount(t *testing.T) {
TargetShardIndex: -1,
}},
}
+ clusterInfo.Version.Store(1)
+
require.NoError(t, s.CreateCluster(ctx, ns, clusterInfo))
+
cluster := &ClusterChecker{
clusterStore: s,
namespace: ns,
diff --git a/controller/controller.go b/controller/controller.go
index 30bcdfb..179353f 100644
--- a/controller/controller.go
+++ b/controller/controller.go
@@ -23,9 +23,9 @@ import (
"context"
"fmt"
"sync"
+ "sync/atomic"
"time"
- "go.uber.org/atomic"
"go.uber.org/zap"
"github.com/apache/kvrocks-controller/config"
@@ -66,7 +66,7 @@ func New(s *store.ClusterStore, config
*config.ControllerConfig) (*Controller, e
}
func (c *Controller) Start(ctx context.Context) error {
- if !c.state.CAS(stateInit, stateRunning) {
+ if !c.state.CompareAndSwap(stateInit, stateRunning) {
return nil
}
@@ -234,7 +234,7 @@ func (c *Controller) updateCluster(namespace, clusterName
string) {
}
func (c *Controller) Close() {
- if !c.state.CAS(stateRunning, stateClosed) {
+ if !c.state.CompareAndSwap(stateRunning, stateClosed) {
return
}
diff --git a/go.mod b/go.mod
index 5d1c101..9b11498 100644
--- a/go.mod
+++ b/go.mod
@@ -21,7 +21,6 @@ require (
go.etcd.io/etcd/client/v3 v3.5.17
go.etcd.io/etcd/raft/v3 v3.5.17
go.etcd.io/etcd/server/v3 v3.5.17
- go.uber.org/atomic v1.11.0
go.uber.org/zap v1.27.0
gopkg.in/yaml.v1 v1.0.0-20140924161607-9f9df34309c0
)
diff --git a/go.sum b/go.sum
index 9d9f98c..21e5600 100644
--- a/go.sum
+++ b/go.sum
@@ -79,8 +79,6 @@ github.com/go-playground/validator/v10 v10.23.0
h1:/PwmTwZhS0dPkav3cdK9kV1FsAmrL
github.com/go-playground/validator/v10 v10.23.0/go.mod
h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
github.com/go-redis/redis/v8 v8.11.5
h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod
h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
-github.com/go-resty/resty/v2 v2.16.2
h1:CpRqTjIzq/rweXUt9+GxzzQdlkqMdt8Lm/fuK/CAbAg=
-github.com/go-resty/resty/v2 v2.16.2/go.mod
h1:0fHAoK7JoBy/Ch36N8VFeMsK7xQOHhvWaC3iOktwmIU=
github.com/go-resty/resty/v2 v2.16.3
h1:zacNT7lt4b8M/io2Ahj6yPypL7bqx9n1iprfQuodV+E=
github.com/go-resty/resty/v2 v2.16.3/go.mod
h1:hkJtXbA2iKHzJheXYvQ8snQES5ZLGKMwQ07xAwp/fiA=
github.com/go-stack/stack v1.8.0/go.mod
h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
@@ -184,8 +182,6 @@ github.com/mattn/go-colorable v0.1.4/go.mod
h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVc
github.com/mattn/go-colorable v0.1.6/go.mod
h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.9/go.mod
h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.12/go.mod
h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
-github.com/mattn/go-colorable v0.1.13
h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
-github.com/mattn/go-colorable v0.1.13/go.mod
h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-colorable v0.1.14
h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE=
github.com/mattn/go-colorable v0.1.14/go.mod
h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8=
github.com/mattn/go-isatty v0.0.3/go.mod
h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
@@ -193,7 +189,6 @@ github.com/mattn/go-isatty v0.0.8/go.mod
h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hd
github.com/mattn/go-isatty v0.0.11/go.mod
h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE=
github.com/mattn/go-isatty v0.0.12/go.mod
h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.14/go.mod
h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
-github.com/mattn/go-isatty v0.0.16/go.mod
h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.20
h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod
h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.9/go.mod
h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
@@ -325,8 +320,6 @@ go.opentelemetry.io/otel/sdk/metric v1.31.0
h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4Jjx
go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod
h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8=
go.opentelemetry.io/otel/trace v1.31.0
h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HYdmJys=
go.opentelemetry.io/otel/trace v1.31.0/go.mod
h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A=
-go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
-go.uber.org/atomic v1.11.0/go.mod
h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod
h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
@@ -384,7 +377,6 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod
h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
diff --git a/server/api/cluster_test.go b/server/api/cluster_test.go
index 005ba4b..78f5d42 100644
--- a/server/api/cluster_test.go
+++ b/server/api/cluster_test.go
@@ -144,7 +144,7 @@ func TestClusterBasics(t *testing.T) {
after, err := handler.s.GetCluster(ctx, ns, "test-cluster")
require.NoError(t, err)
- require.EqualValues(t, before.Version.Inc(),
after.Version.Load())
+ require.EqualValues(t, before.Version.Add(1),
after.Version.Load())
require.Len(t, after.Shards[0].SlotRanges, 2)
require.EqualValues(t, store.SlotRange{Start: 0, Stop: 2},
after.Shards[0].SlotRanges[0])
require.EqualValues(t, store.SlotRange{Start: 4, Stop: 8191},
after.Shards[0].SlotRanges[1])
diff --git a/server/api/shard_test.go b/server/api/shard_test.go
index 1bfaa53..c4cd642 100644
--- a/server/api/shard_test.go
+++ b/server/api/shard_test.go
@@ -31,7 +31,6 @@ import (
"github.com/gin-gonic/gin"
"github.com/stretchr/testify/require"
- "go.uber.org/atomic"
"github.com/apache/kvrocks-controller/consts"
"github.com/apache/kvrocks-controller/server/middleware"
@@ -48,11 +47,14 @@ func TestShardBasics(t *testing.T) {
shard := store.NewShard()
shard.SlotRanges = []store.SlotRange{{Start: 0, Stop: 16383}}
shard.Nodes = []store.Node{store.NewClusterNode("127.0.0.1:1234", "")}
- err := handler.s.CreateCluster(context.Background(), ns, &store.Cluster{
- Name: clusterName,
- Version: *atomic.NewInt64(1),
- Shards: []*store.Shard{shard},
- })
+
+ clusterInfo := &store.Cluster{
+ Name: clusterName,
+ Shards: []*store.Shard{shard},
+ }
+ clusterInfo.Version.Store(1)
+
+ err := handler.s.CreateCluster(context.Background(), ns, clusterInfo)
require.NoError(t, err)
runCreate := func(t *testing.T, expectedStatusCode int) {
diff --git a/store/cluster.go b/store/cluster.go
index d4713dd..3bde499 100644
--- a/store/cluster.go
+++ b/store/cluster.go
@@ -22,20 +22,20 @@ package store
import (
"context"
+ "encoding/json"
"errors"
"fmt"
"sort"
"strconv"
"strings"
-
- "go.uber.org/atomic"
+ "sync/atomic"
"github.com/apache/kvrocks-controller/consts"
)
type Cluster struct {
Name string `json:"name"`
- Version atomic.Int64 `json:"version"`
+ Version atomic.Int64 `json:"-"`
Shards []*Shard `json:"shards"`
}
@@ -232,7 +232,7 @@ func (cluster *Cluster) MigrateSlot(ctx context.Context,
slot int, targetShardId
}
func (cluster *Cluster) SetSlot(ctx context.Context, slot int, targetNodeID
string) error {
- version := cluster.Version.Inc()
+ version := cluster.Version.Add(1)
for i := 0; i < len(cluster.Shards); i++ {
for _, node := range cluster.Shards[i].Nodes {
clusterNode, ok := node.(*ClusterNode)
@@ -309,8 +309,42 @@ func ParseCluster(clusterStr string) (*Cluster, error) {
masterNode := shards[i].Nodes[0]
shards[i].Nodes = append(shards[i].Nodes,
slaveNodes[masterNode.ID()]...)
}
- return &Cluster{
- Version: *atomic.NewInt64(clusterVer),
- Shards: shards,
- }, nil
+
+ clusterInfo := &Cluster{
+ Shards: shards,
+ }
+ clusterInfo.Version.Store(clusterVer)
+
+ return clusterInfo, nil
+}
+
+// MarshalJSON is a custom function since the atomic.Int64 type does not
directly implement JSON marshaling.
+func (cluster *Cluster) MarshalJSON() ([]byte, error) {
+ type Alias Cluster // to avoid recursion
+
+ return json.Marshal(&struct {
+ Version int64 `json:"version"`
+ *Alias
+ }{
+ Version: cluster.Version.Load(),
+ Alias: (*Alias)(cluster),
+ })
+}
+
+// UnmarshalJSON is a custom function since the atomic.Int64 type does not
directly implement JSON unmarshaling.
+func (cluster *Cluster) UnmarshalJSON(data []byte) error {
+ type Alias Cluster
+
+ aux := &struct {
+ Version int64 `json:"version"`
+ *Alias
+ }{
+ Alias: (*Alias)(cluster),
+ }
+ if err := json.Unmarshal(data, &aux); err != nil {
+ return err
+ }
+
+ cluster.Version.Store(aux.Version)
+ return nil
}
diff --git a/store/engine/etcd/etcd.go b/store/engine/etcd/etcd.go
index 1e6d46d..93fcbc1 100644
--- a/store/engine/etcd/etcd.go
+++ b/store/engine/etcd/etcd.go
@@ -17,6 +17,7 @@
* under the License.
*
*/
+
package etcd
import (
@@ -24,6 +25,7 @@ import (
"errors"
"strings"
"sync"
+ "sync/atomic"
"time"
"github.com/apache/kvrocks-controller/consts"
@@ -31,7 +33,6 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/pkg/transport"
- "go.uber.org/atomic"
"go.uber.org/zap"
"github.com/apache/kvrocks-controller/logger"
diff --git a/store/engine/raft/node.go b/store/engine/raft/node.go
index c5c9092..38b208a 100644
--- a/store/engine/raft/node.go
+++ b/store/engine/raft/node.go
@@ -29,6 +29,7 @@ import (
"net/url"
"os"
"sync"
+ "sync/atomic"
"time"
"github.com/apache/kvrocks-controller/logger"
@@ -39,8 +40,6 @@ import (
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
-
- "go.uber.org/atomic"
"go.uber.org/zap"
)
@@ -131,7 +130,7 @@ func (n *Node) SetSnapshotThreshold(threshold uint64) {
func (n *Node) run() error {
// The node is already running
- if !n.isRunning.CAS(false, true) {
+ if !n.isRunning.CompareAndSwap(false, true) {
return nil
}
n.shutdown = make(chan struct{})
@@ -526,7 +525,7 @@ func (n *Node) ReportSnapshot(id uint64, status
raft.SnapshotStatus) {
}
func (n *Node) Close() error {
- if !n.isRunning.CAS(true, false) {
+ if !n.isRunning.CompareAndSwap(true, false) {
return nil
}
close(n.shutdown)
diff --git a/store/engine/zookeeper/zookeeper.go
b/store/engine/zookeeper/zookeeper.go
index 7e863a3..e8d8467 100644
--- a/store/engine/zookeeper/zookeeper.go
+++ b/store/engine/zookeeper/zookeeper.go
@@ -17,6 +17,7 @@
* under the License.
*
*/
+
package zookeeper
import (
@@ -24,20 +25,19 @@ import (
"errors"
"strings"
"sync"
+ "sync/atomic"
"time"
"github.com/apache/kvrocks-controller/logger"
"github.com/apache/kvrocks-controller/store/engine"
"github.com/go-zookeeper/zk"
- "go.uber.org/atomic"
)
const (
- sessionTTL = 6 * time.Second
+ sessionTTL = 6 * time.Second
+ defaultElectPath = "/kvrocks/controller/leader"
)
-const defaultElectPath = "/kvrocks/controller/leader"
-
type Config struct {
Addrs []string `yaml:"addrs"`
Scheme string `yaml:"scheme"`
@@ -143,7 +143,7 @@ func (e *Zookeeper) Exists(ctx context.Context, key string)
(bool, error) {
return exists, nil
}
-// If the key exists, it will be set; if not, it will be created.
+// Set sets the value for the key. If the key exists, it will be set; if not,
it will be created.
func (e *Zookeeper) Set(ctx context.Context, key string, value []byte) error {
exist, _ := e.Exists(ctx, key)
if exist {
diff --git a/store/store.go b/store/store.go
index f0f6672..bb0edfc 100644
--- a/store/store.go
+++ b/store/store.go
@@ -184,7 +184,7 @@ func (s *ClusterStore) UpdateCluster(ctx context.Context,
ns string, clusterInfo
return fmt.Errorf("the cluster has been updated by others")
}
- clusterInfo.Version.Inc()
+ clusterInfo.Version.Add(1)
clusterBytes, err := json.Marshal(clusterInfo)
if err != nil {
return fmt.Errorf("cluster: %w", err)
diff --git a/store/store_test.go b/store/store_test.go
index f75115c..b0517f7 100644
--- a/store/store_test.go
+++ b/store/store_test.go
@@ -75,7 +75,7 @@ func TestClusterStore(t *testing.T) {
gotCluster, err := store.GetCluster(ctx, ns, "cluster0")
require.NoError(t, err)
require.Equal(t, cluster0.Name, gotCluster.Name)
- require.Equal(t, cluster0.Version, gotCluster.Version)
+ require.Equal(t, cluster0.Version.Load(),
gotCluster.Version.Load())
gotClusters, err := store.ListCluster(ctx, ns)
require.NoError(t, err)