This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 7644f77  Use sync/atomic instead of Uber's implementation (#256)
7644f77 is described below

commit 7644f7763b39fb46748f9d15a6b5e9abdea04447
Author: Yaroslav <[email protected]>
AuthorDate: Tue Jan 14 14:59:14 2025 +0200

    Use sync/atomic instead of Uber's implementation (#256)
---
 controller/cluster_test.go          |  9 ++++---
 controller/controller.go            |  6 ++---
 go.mod                              |  1 -
 go.sum                              |  8 ------
 server/api/cluster_test.go          |  2 +-
 server/api/shard_test.go            | 14 ++++++-----
 store/cluster.go                    | 50 +++++++++++++++++++++++++++++++------
 store/engine/etcd/etcd.go           |  3 ++-
 store/engine/raft/node.go           |  7 +++---
 store/engine/zookeeper/zookeeper.go | 10 ++++----
 store/store.go                      |  2 +-
 store/store_test.go                 |  2 +-
 12 files changed, 71 insertions(+), 43 deletions(-)

diff --git a/controller/cluster_test.go b/controller/cluster_test.go
index 65a8ac6..2fd7db7 100644
--- a/controller/cluster_test.go
+++ b/controller/cluster_test.go
@@ -27,7 +27,6 @@ import (
        "time"
 
        "github.com/stretchr/testify/require"
-       "go.uber.org/atomic"
 
        "github.com/apache/kvrocks-controller/consts"
        "github.com/apache/kvrocks-controller/store"
@@ -66,7 +65,7 @@ func (s *MockClusterStore) GetCluster(ctx context.Context, 
ns, cluster string) (
 }
 
 func (s *MockClusterStore) UpdateCluster(ctx context.Context, ns string, 
cluster *store.Cluster) error {
-       cluster.Version.Inc()
+       cluster.Version.Add(1)
        return s.SetCluster(ctx, ns, cluster)
 }
 
@@ -104,8 +103,7 @@ func TestCluster_FailureCount(t *testing.T) {
        mockNode3.Sequence = 101
 
        clusterInfo := &store.Cluster{
-               Name:    clusterName,
-               Version: *atomic.NewInt64(1),
+               Name: clusterName,
                Shards: []*store.Shard{{
                        Nodes: []store.Node{
                                mockNode0, mockNode1, mockNode2, mockNode3,
@@ -115,7 +113,10 @@ func TestCluster_FailureCount(t *testing.T) {
                        TargetShardIndex: -1,
                }},
        }
+       clusterInfo.Version.Store(1)
+
        require.NoError(t, s.CreateCluster(ctx, ns, clusterInfo))
+
        cluster := &ClusterChecker{
                clusterStore: s,
                namespace:    ns,
diff --git a/controller/controller.go b/controller/controller.go
index 30bcdfb..179353f 100644
--- a/controller/controller.go
+++ b/controller/controller.go
@@ -23,9 +23,9 @@ import (
        "context"
        "fmt"
        "sync"
+       "sync/atomic"
        "time"
 
-       "go.uber.org/atomic"
        "go.uber.org/zap"
 
        "github.com/apache/kvrocks-controller/config"
@@ -66,7 +66,7 @@ func New(s *store.ClusterStore, config 
*config.ControllerConfig) (*Controller, e
 }
 
 func (c *Controller) Start(ctx context.Context) error {
-       if !c.state.CAS(stateInit, stateRunning) {
+       if !c.state.CompareAndSwap(stateInit, stateRunning) {
                return nil
        }
 
@@ -234,7 +234,7 @@ func (c *Controller) updateCluster(namespace, clusterName 
string) {
 }
 
 func (c *Controller) Close() {
-       if !c.state.CAS(stateRunning, stateClosed) {
+       if !c.state.CompareAndSwap(stateRunning, stateClosed) {
                return
        }
 
diff --git a/go.mod b/go.mod
index 5d1c101..9b11498 100644
--- a/go.mod
+++ b/go.mod
@@ -21,7 +21,6 @@ require (
        go.etcd.io/etcd/client/v3 v3.5.17
        go.etcd.io/etcd/raft/v3 v3.5.17
        go.etcd.io/etcd/server/v3 v3.5.17
-       go.uber.org/atomic v1.11.0
        go.uber.org/zap v1.27.0
        gopkg.in/yaml.v1 v1.0.0-20140924161607-9f9df34309c0
 )
diff --git a/go.sum b/go.sum
index 9d9f98c..21e5600 100644
--- a/go.sum
+++ b/go.sum
@@ -79,8 +79,6 @@ github.com/go-playground/validator/v10 v10.23.0 
h1:/PwmTwZhS0dPkav3cdK9kV1FsAmrL
 github.com/go-playground/validator/v10 v10.23.0/go.mod 
h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
 github.com/go-redis/redis/v8 v8.11.5 
h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
 github.com/go-redis/redis/v8 v8.11.5/go.mod 
h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
-github.com/go-resty/resty/v2 v2.16.2 
h1:CpRqTjIzq/rweXUt9+GxzzQdlkqMdt8Lm/fuK/CAbAg=
-github.com/go-resty/resty/v2 v2.16.2/go.mod 
h1:0fHAoK7JoBy/Ch36N8VFeMsK7xQOHhvWaC3iOktwmIU=
 github.com/go-resty/resty/v2 v2.16.3 
h1:zacNT7lt4b8M/io2Ahj6yPypL7bqx9n1iprfQuodV+E=
 github.com/go-resty/resty/v2 v2.16.3/go.mod 
h1:hkJtXbA2iKHzJheXYvQ8snQES5ZLGKMwQ07xAwp/fiA=
 github.com/go-stack/stack v1.8.0/go.mod 
h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
@@ -184,8 +182,6 @@ github.com/mattn/go-colorable v0.1.4/go.mod 
h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVc
 github.com/mattn/go-colorable v0.1.6/go.mod 
h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
 github.com/mattn/go-colorable v0.1.9/go.mod 
h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
 github.com/mattn/go-colorable v0.1.12/go.mod 
h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
-github.com/mattn/go-colorable v0.1.13 
h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
-github.com/mattn/go-colorable v0.1.13/go.mod 
h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
 github.com/mattn/go-colorable v0.1.14 
h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE=
 github.com/mattn/go-colorable v0.1.14/go.mod 
h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8=
 github.com/mattn/go-isatty v0.0.3/go.mod 
h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
@@ -193,7 +189,6 @@ github.com/mattn/go-isatty v0.0.8/go.mod 
h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hd
 github.com/mattn/go-isatty v0.0.11/go.mod 
h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE=
 github.com/mattn/go-isatty v0.0.12/go.mod 
h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
 github.com/mattn/go-isatty v0.0.14/go.mod 
h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
-github.com/mattn/go-isatty v0.0.16/go.mod 
h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
 github.com/mattn/go-isatty v0.0.20 
h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
 github.com/mattn/go-isatty v0.0.20/go.mod 
h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
 github.com/mattn/go-runewidth v0.0.9/go.mod 
h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
@@ -325,8 +320,6 @@ go.opentelemetry.io/otel/sdk/metric v1.31.0 
h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4Jjx
 go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod 
h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8=
 go.opentelemetry.io/otel/trace v1.31.0 
h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HYdmJys=
 go.opentelemetry.io/otel/trace v1.31.0/go.mod 
h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A=
-go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
-go.uber.org/atomic v1.11.0/go.mod 
h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
 go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
 go.uber.org/goleak v1.3.0/go.mod 
h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
 go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
@@ -384,7 +377,6 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod 
h1:oPkhp1MJrh7nUepCBc
 golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
 golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
diff --git a/server/api/cluster_test.go b/server/api/cluster_test.go
index 005ba4b..78f5d42 100644
--- a/server/api/cluster_test.go
+++ b/server/api/cluster_test.go
@@ -144,7 +144,7 @@ func TestClusterBasics(t *testing.T) {
                after, err := handler.s.GetCluster(ctx, ns, "test-cluster")
                require.NoError(t, err)
 
-               require.EqualValues(t, before.Version.Inc(), 
after.Version.Load())
+               require.EqualValues(t, before.Version.Add(1), 
after.Version.Load())
                require.Len(t, after.Shards[0].SlotRanges, 2)
                require.EqualValues(t, store.SlotRange{Start: 0, Stop: 2}, 
after.Shards[0].SlotRanges[0])
                require.EqualValues(t, store.SlotRange{Start: 4, Stop: 8191}, 
after.Shards[0].SlotRanges[1])
diff --git a/server/api/shard_test.go b/server/api/shard_test.go
index 1bfaa53..c4cd642 100644
--- a/server/api/shard_test.go
+++ b/server/api/shard_test.go
@@ -31,7 +31,6 @@ import (
 
        "github.com/gin-gonic/gin"
        "github.com/stretchr/testify/require"
-       "go.uber.org/atomic"
 
        "github.com/apache/kvrocks-controller/consts"
        "github.com/apache/kvrocks-controller/server/middleware"
@@ -48,11 +47,14 @@ func TestShardBasics(t *testing.T) {
        shard := store.NewShard()
        shard.SlotRanges = []store.SlotRange{{Start: 0, Stop: 16383}}
        shard.Nodes = []store.Node{store.NewClusterNode("127.0.0.1:1234", "")}
-       err := handler.s.CreateCluster(context.Background(), ns, &store.Cluster{
-               Name:    clusterName,
-               Version: *atomic.NewInt64(1),
-               Shards:  []*store.Shard{shard},
-       })
+
+       clusterInfo := &store.Cluster{
+               Name:   clusterName,
+               Shards: []*store.Shard{shard},
+       }
+       clusterInfo.Version.Store(1)
+
+       err := handler.s.CreateCluster(context.Background(), ns, clusterInfo)
        require.NoError(t, err)
 
        runCreate := func(t *testing.T, expectedStatusCode int) {
diff --git a/store/cluster.go b/store/cluster.go
index d4713dd..3bde499 100644
--- a/store/cluster.go
+++ b/store/cluster.go
@@ -22,20 +22,20 @@ package store
 
 import (
        "context"
+       "encoding/json"
        "errors"
        "fmt"
        "sort"
        "strconv"
        "strings"
-
-       "go.uber.org/atomic"
+       "sync/atomic"
 
        "github.com/apache/kvrocks-controller/consts"
 )
 
 type Cluster struct {
        Name    string       `json:"name"`
-       Version atomic.Int64 `json:"version"`
+       Version atomic.Int64 `json:"-"`
        Shards  []*Shard     `json:"shards"`
 }
 
@@ -232,7 +232,7 @@ func (cluster *Cluster) MigrateSlot(ctx context.Context, 
slot int, targetShardId
 }
 
 func (cluster *Cluster) SetSlot(ctx context.Context, slot int, targetNodeID 
string) error {
-       version := cluster.Version.Inc()
+       version := cluster.Version.Add(1)
        for i := 0; i < len(cluster.Shards); i++ {
                for _, node := range cluster.Shards[i].Nodes {
                        clusterNode, ok := node.(*ClusterNode)
@@ -309,8 +309,42 @@ func ParseCluster(clusterStr string) (*Cluster, error) {
                masterNode := shards[i].Nodes[0]
                shards[i].Nodes = append(shards[i].Nodes, 
slaveNodes[masterNode.ID()]...)
        }
-       return &Cluster{
-               Version: *atomic.NewInt64(clusterVer),
-               Shards:  shards,
-       }, nil
+
+       clusterInfo := &Cluster{
+               Shards: shards,
+       }
+       clusterInfo.Version.Store(clusterVer)
+
+       return clusterInfo, nil
+}
+
+// MarshalJSON is a custom function since the atomic.Int64 type does not 
directly implement JSON marshaling.
+func (cluster *Cluster) MarshalJSON() ([]byte, error) {
+       type Alias Cluster // to avoid recursion
+
+       return json.Marshal(&struct {
+               Version int64 `json:"version"`
+               *Alias
+       }{
+               Version: cluster.Version.Load(),
+               Alias:   (*Alias)(cluster),
+       })
+}
+
+// UnmarshalJSON is a custom function since the atomic.Int64 type does not 
directly implement JSON unmarshaling.
+func (cluster *Cluster) UnmarshalJSON(data []byte) error {
+       type Alias Cluster
+
+       aux := &struct {
+               Version int64 `json:"version"`
+               *Alias
+       }{
+               Alias: (*Alias)(cluster),
+       }
+       if err := json.Unmarshal(data, &aux); err != nil {
+               return err
+       }
+
+       cluster.Version.Store(aux.Version)
+       return nil
 }
diff --git a/store/engine/etcd/etcd.go b/store/engine/etcd/etcd.go
index 1e6d46d..93fcbc1 100644
--- a/store/engine/etcd/etcd.go
+++ b/store/engine/etcd/etcd.go
@@ -17,6 +17,7 @@
  * under the License.
  *
  */
+
 package etcd
 
 import (
@@ -24,6 +25,7 @@ import (
        "errors"
        "strings"
        "sync"
+       "sync/atomic"
        "time"
 
        "github.com/apache/kvrocks-controller/consts"
@@ -31,7 +33,6 @@ import (
        clientv3 "go.etcd.io/etcd/client/v3"
        "go.etcd.io/etcd/client/v3/concurrency"
        "go.etcd.io/etcd/pkg/transport"
-       "go.uber.org/atomic"
        "go.uber.org/zap"
 
        "github.com/apache/kvrocks-controller/logger"
diff --git a/store/engine/raft/node.go b/store/engine/raft/node.go
index c5c9092..38b208a 100644
--- a/store/engine/raft/node.go
+++ b/store/engine/raft/node.go
@@ -29,6 +29,7 @@ import (
        "net/url"
        "os"
        "sync"
+       "sync/atomic"
        "time"
 
        "github.com/apache/kvrocks-controller/logger"
@@ -39,8 +40,6 @@ import (
        "go.etcd.io/etcd/raft/v3/raftpb"
        "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
        stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
-
-       "go.uber.org/atomic"
        "go.uber.org/zap"
 )
 
@@ -131,7 +130,7 @@ func (n *Node) SetSnapshotThreshold(threshold uint64) {
 
 func (n *Node) run() error {
        // The node is already running
-       if !n.isRunning.CAS(false, true) {
+       if !n.isRunning.CompareAndSwap(false, true) {
                return nil
        }
        n.shutdown = make(chan struct{})
@@ -526,7 +525,7 @@ func (n *Node) ReportSnapshot(id uint64, status 
raft.SnapshotStatus) {
 }
 
 func (n *Node) Close() error {
-       if !n.isRunning.CAS(true, false) {
+       if !n.isRunning.CompareAndSwap(true, false) {
                return nil
        }
        close(n.shutdown)
diff --git a/store/engine/zookeeper/zookeeper.go 
b/store/engine/zookeeper/zookeeper.go
index 7e863a3..e8d8467 100644
--- a/store/engine/zookeeper/zookeeper.go
+++ b/store/engine/zookeeper/zookeeper.go
@@ -17,6 +17,7 @@
  * under the License.
  *
  */
+
 package zookeeper
 
 import (
@@ -24,20 +25,19 @@ import (
        "errors"
        "strings"
        "sync"
+       "sync/atomic"
        "time"
 
        "github.com/apache/kvrocks-controller/logger"
        "github.com/apache/kvrocks-controller/store/engine"
        "github.com/go-zookeeper/zk"
-       "go.uber.org/atomic"
 )
 
 const (
-       sessionTTL = 6 * time.Second
+       sessionTTL       = 6 * time.Second
+       defaultElectPath = "/kvrocks/controller/leader"
 )
 
-const defaultElectPath = "/kvrocks/controller/leader"
-
 type Config struct {
        Addrs     []string `yaml:"addrs"`
        Scheme    string   `yaml:"scheme"`
@@ -143,7 +143,7 @@ func (e *Zookeeper) Exists(ctx context.Context, key string) 
(bool, error) {
        return exists, nil
 }
 
-// If the key exists, it will be set; if not, it will be created.
+// Set sets the value for the key. If the key exists, it will be set; if not, 
it will be created.
 func (e *Zookeeper) Set(ctx context.Context, key string, value []byte) error {
        exist, _ := e.Exists(ctx, key)
        if exist {
diff --git a/store/store.go b/store/store.go
index f0f6672..bb0edfc 100644
--- a/store/store.go
+++ b/store/store.go
@@ -184,7 +184,7 @@ func (s *ClusterStore) UpdateCluster(ctx context.Context, 
ns string, clusterInfo
                return fmt.Errorf("the cluster has been updated by others")
        }
 
-       clusterInfo.Version.Inc()
+       clusterInfo.Version.Add(1)
        clusterBytes, err := json.Marshal(clusterInfo)
        if err != nil {
                return fmt.Errorf("cluster: %w", err)
diff --git a/store/store_test.go b/store/store_test.go
index f75115c..b0517f7 100644
--- a/store/store_test.go
+++ b/store/store_test.go
@@ -75,7 +75,7 @@ func TestClusterStore(t *testing.T) {
                gotCluster, err := store.GetCluster(ctx, ns, "cluster0")
                require.NoError(t, err)
                require.Equal(t, cluster0.Name, gotCluster.Name)
-               require.Equal(t, cluster0.Version, gotCluster.Version)
+               require.Equal(t, cluster0.Version.Load(), 
gotCluster.Version.Load())
 
                gotClusters, err := store.ListCluster(ctx, ns)
                require.NoError(t, err)

Reply via email to