This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git


The following commit(s) were added to refs/heads/unstable by this push:
     new e7b28c3  Add `sync.Group` to wait for the election goroutines to exit 
for etcd storage (#132)
e7b28c3 is described below

commit e7b28c31c71c0668946e0175e9ca86a1a457a2b1
Author: 纪华裕 <[email protected]>
AuthorDate: Thu Dec 7 20:15:19 2023 +0800

    Add `sync.Group` to wait for the election goroutines to exit for etcd 
storage (#132)
---
 storage/persistence/etcd/etcd.go      | 5 +++++
 storage/persistence/etcd/etcd_test.go | 9 +++++----
 2 files changed, 10 insertions(+), 4 deletions(-)

diff --git a/storage/persistence/etcd/etcd.go b/storage/persistence/etcd/etcd.go
index a73f136..399e831 100644
--- a/storage/persistence/etcd/etcd.go
+++ b/storage/persistence/etcd/etcd.go
@@ -68,6 +68,7 @@ type Etcd struct {
        isReady   atomic.Bool
 
        quitCh         chan struct{}
+       wg             sync.WaitGroup
        electionCh     chan *concurrency.Election
        leaderChangeCh chan bool
 }
@@ -120,6 +121,7 @@ func New(id string, cfg *Config) (*Etcd, error) {
                leaderChangeCh: make(chan bool),
        }
        e.isReady.Store(false)
+       e.wg.Add(2)
        go e.electLoop(context.Background())
        go e.observeLeaderEvent(context.Background())
        return e, nil
@@ -211,6 +213,7 @@ func (e *Etcd) List(ctx context.Context, prefix string) 
([]persistence.Entry, er
 }
 
 func (e *Etcd) electLoop(ctx context.Context) {
+       defer e.wg.Done()
        for {
                select {
                case <-e.quitCh:
@@ -249,6 +252,7 @@ func (e *Etcd) electLoop(ctx context.Context) {
 }
 
 func (e *Etcd) observeLeaderEvent(ctx context.Context) {
+       defer e.wg.Done()
        var election *concurrency.Election
        select {
        case elect := <-e.electionCh:
@@ -287,5 +291,6 @@ func (e *Etcd) observeLeaderEvent(ctx context.Context) {
 
 func (e *Etcd) Close() error {
        close(e.quitCh)
+       e.wg.Wait()
        return e.client.Close()
 }
diff --git a/storage/persistence/etcd/etcd_test.go 
b/storage/persistence/etcd/etcd_test.go
index 61caee6..e8e7294 100644
--- a/storage/persistence/etcd/etcd_test.go
+++ b/storage/persistence/etcd/etcd_test.go
@@ -40,6 +40,11 @@ func TestBasicOperations(t *testing.T) {
        })
        require.NoError(t, err)
        defer persist.Close()
+       go func() {
+               for range persist.LeaderChange() {
+                       // do nothing
+               }
+       }()
 
        ctx := context.Background()
        keys := []string{"/a/b/c0", "/a/b/c1", "/a/b/c2"}
@@ -82,7 +87,6 @@ func TestElect(t *testing.T) {
                return node1.Leader() == node0.myID
        }, 10*time.Second, 100*time.Millisecond, "node1's leader should be the 
node0")
 
-       shutdown := make(chan struct{})
        go func() {
                for {
                        select {
@@ -90,8 +94,6 @@ func TestElect(t *testing.T) {
                                // do nothing
                        case <-node1.LeaderChange():
                                // do nothing
-                       case <-shutdown:
-                               return
                        }
                }
        }()
@@ -101,5 +103,4 @@ func TestElect(t *testing.T) {
        require.Eventuallyf(t, func() bool {
                return node1.Leader() == node1.myID
        }, 15*time.Second, 100*time.Millisecond, "node1 should be the leader")
-       close(shutdown)
 }

Reply via email to