This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git
The following commit(s) were added to refs/heads/unstable by this push:
new e7b28c3 Add `sync.Group` to wait for the election goroutines to exit
for etcd storage (#132)
e7b28c3 is described below
commit e7b28c31c71c0668946e0175e9ca86a1a457a2b1
Author: 纪华裕 <[email protected]>
AuthorDate: Thu Dec 7 20:15:19 2023 +0800
Add `sync.Group` to wait for the election goroutines to exit for etcd
storage (#132)
---
storage/persistence/etcd/etcd.go | 5 +++++
storage/persistence/etcd/etcd_test.go | 9 +++++----
2 files changed, 10 insertions(+), 4 deletions(-)
diff --git a/storage/persistence/etcd/etcd.go b/storage/persistence/etcd/etcd.go
index a73f136..399e831 100644
--- a/storage/persistence/etcd/etcd.go
+++ b/storage/persistence/etcd/etcd.go
@@ -68,6 +68,7 @@ type Etcd struct {
isReady atomic.Bool
quitCh chan struct{}
+ wg sync.WaitGroup
electionCh chan *concurrency.Election
leaderChangeCh chan bool
}
@@ -120,6 +121,7 @@ func New(id string, cfg *Config) (*Etcd, error) {
leaderChangeCh: make(chan bool),
}
e.isReady.Store(false)
+ e.wg.Add(2)
go e.electLoop(context.Background())
go e.observeLeaderEvent(context.Background())
return e, nil
@@ -211,6 +213,7 @@ func (e *Etcd) List(ctx context.Context, prefix string)
([]persistence.Entry, er
}
func (e *Etcd) electLoop(ctx context.Context) {
+ defer e.wg.Done()
for {
select {
case <-e.quitCh:
@@ -249,6 +252,7 @@ func (e *Etcd) electLoop(ctx context.Context) {
}
func (e *Etcd) observeLeaderEvent(ctx context.Context) {
+ defer e.wg.Done()
var election *concurrency.Election
select {
case elect := <-e.electionCh:
@@ -287,5 +291,6 @@ func (e *Etcd) observeLeaderEvent(ctx context.Context) {
func (e *Etcd) Close() error {
close(e.quitCh)
+ e.wg.Wait()
return e.client.Close()
}
diff --git a/storage/persistence/etcd/etcd_test.go
b/storage/persistence/etcd/etcd_test.go
index 61caee6..e8e7294 100644
--- a/storage/persistence/etcd/etcd_test.go
+++ b/storage/persistence/etcd/etcd_test.go
@@ -40,6 +40,11 @@ func TestBasicOperations(t *testing.T) {
})
require.NoError(t, err)
defer persist.Close()
+ go func() {
+ for range persist.LeaderChange() {
+ // do nothing
+ }
+ }()
ctx := context.Background()
keys := []string{"/a/b/c0", "/a/b/c1", "/a/b/c2"}
@@ -82,7 +87,6 @@ func TestElect(t *testing.T) {
return node1.Leader() == node0.myID
}, 10*time.Second, 100*time.Millisecond, "node1's leader should be the
node0")
- shutdown := make(chan struct{})
go func() {
for {
select {
@@ -90,8 +94,6 @@ func TestElect(t *testing.T) {
// do nothing
case <-node1.LeaderChange():
// do nothing
- case <-shutdown:
- return
}
}
}()
@@ -101,5 +103,4 @@ func TestElect(t *testing.T) {
require.Eventuallyf(t, func() bool {
return node1.Leader() == node1.myID
}, 15*time.Second, 100*time.Millisecond, "node1 should be the leader")
- close(shutdown)
}