This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-go-samples.git
The following commit(s) were added to refs/heads/main by this push:
new d2c0f9ec refactor(samples): avoid time.Sleep in tests and examples
(#1029)
d2c0f9ec is described below
commit d2c0f9ec1dde5e46fac107257659f37cc75369d5
Author: Carve_ <[email protected]>
AuthorDate: Wed Feb 11 21:57:32 2026 +0800
refactor(samples): avoid time.Sleep in tests and examples (#1029)
* refactor(samples): avoid time.Sleep in tests and examples
* chore(format): run imports-formatter
* remove unnecessary timer.Stop()
---
async/go-client/cmd/main.go | 27 ++++++++--
config_center/nacos/go-client/cmd/main.go | 61 ++++++++++++++++++----
config_center/nacos/go-server/cmd/main.go | 61 ++++++++++++++++++----
config_center/zookeeper/go-client/cmd/main.go | 25 +++++++--
config_center/zookeeper/go-server/cmd/main.go | 25 +++++++--
filter/sentinel/go-client/cmd/main.go | 48 +++++++++++++++--
.../async/tests/integration/async_test.go | 17 ++++--
.../nacos/tests/integration/main_test.go | 20 ++++++-
.../zookeeper/tests/integration/main_test.go | 23 +++++++-
.../sentinel/tests/integration/sentinel_test.go | 49 ++++++++---------
10 files changed, 286 insertions(+), 70 deletions(-)
diff --git a/async/go-client/cmd/main.go b/async/go-client/cmd/main.go
index 237af344..c14a8218 100644
--- a/async/go-client/cmd/main.go
+++ b/async/go-client/cmd/main.go
@@ -65,14 +65,31 @@ func main() {
func testAsync() {
req := &user.GetUserRequest{Id: "003"}
- _, err := userProvider.GetUser(context.Background(), req)
- if err != nil {
- panic(err)
- }
+ ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
+ defer cancel()
+
+ respCh := make(chan *user.GetUserResponse, 1)
+ errCh := make(chan error, 1)
+
+ go func() {
+ resp, err := userProvider.GetUser(ctx, req)
+ if err != nil {
+ errCh <- err
+ return
+ }
+ respCh <- resp
+ }()
logger.Info("non-blocking before async callback resp: do something ...
")
- time.Sleep(time.Second)
+ select {
+ case err := <-errCh:
+ panic(err)
+ case resp := <-respCh:
+ logger.Infof("async callback resp: %+v", resp)
+ case <-ctx.Done():
+ logger.Warnf("async callback timeout: %v", ctx.Err())
+ }
}
func testAsyncOneWay() {
diff --git a/config_center/nacos/go-client/cmd/main.go
b/config_center/nacos/go-client/cmd/main.go
index 481015c7..5b925de5 100644
--- a/config_center/nacos/go-client/cmd/main.go
+++ b/config_center/nacos/go-client/cmd/main.go
@@ -19,6 +19,8 @@ package main
import (
"context"
+ "fmt"
+ "strings"
"time"
)
@@ -30,6 +32,7 @@ import (
"github.com/dubbogo/gost/log/logger"
"github.com/nacos-group/nacos-sdk-go/v2/clients"
+ "github.com/nacos-group/nacos-sdk-go/v2/clients/config_client"
"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
"github.com/nacos-group/nacos-sdk-go/v2/vo"
)
@@ -74,19 +77,16 @@ func main() {
panic(err)
}
- success, err := configClient.PublishConfig(vo.ConfigParam{
- DataId: "dubbo-go-samples-configcenter-nacos-go-client",
- Group: "dubbo",
- Content: configCenterNacosClientConfig,
- })
- if err != nil {
+ if err := publishAndWaitConfig(
+ configClient,
+ "dubbo-go-samples-configcenter-nacos-go-client",
+ "dubbo",
+ configCenterNacosClientConfig,
+ 10*time.Second,
+ 200*time.Millisecond,
+ ); err != nil {
panic(err)
}
- if !success {
- return
- }
-
- time.Sleep(time.Second * 10)
nacosOption := config_center.WithNacos()
dataIdOption :=
config_center.WithDataID("dubbo-go-samples-configcenter-nacos-go-client")
@@ -115,3 +115,42 @@ func main() {
}
logger.Infof("Server response: %s", resp)
}
+
+func publishAndWaitConfig(
+ configClient config_client.IConfigClient,
+ dataID string,
+ group string,
+ content string,
+ timeout time.Duration,
+ pollInterval time.Duration,
+) error {
+ success, err := configClient.PublishConfig(vo.ConfigParam{
+ DataId: dataID,
+ Group: group,
+ Content: content,
+ })
+ if err != nil {
+ return err
+ }
+ if !success {
+ return fmt.Errorf("publish config failed")
+ }
+
+ deadline := time.Now().Add(timeout)
+ for {
+ current, err := configClient.GetConfig(vo.ConfigParam{
+ DataId: dataID,
+ Group: group,
+ })
+ if err == nil && strings.TrimSpace(current) ==
strings.TrimSpace(content) {
+ return nil
+ }
+ if time.Now().After(deadline) {
+ if err != nil {
+ return err
+ }
+ return fmt.Errorf("wait for config center timeout")
+ }
+ time.Sleep(pollInterval)
+ }
+}
diff --git a/config_center/nacos/go-server/cmd/main.go
b/config_center/nacos/go-server/cmd/main.go
index c03e7330..1fb6284a 100644
--- a/config_center/nacos/go-server/cmd/main.go
+++ b/config_center/nacos/go-server/cmd/main.go
@@ -19,6 +19,8 @@ package main
import (
"context"
+ "fmt"
+ "strings"
"time"
)
@@ -30,6 +32,7 @@ import (
"github.com/dubbogo/gost/log/logger"
"github.com/nacos-group/nacos-sdk-go/v2/clients"
+ "github.com/nacos-group/nacos-sdk-go/v2/clients/config_client"
"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
"github.com/nacos-group/nacos-sdk-go/v2/vo"
)
@@ -86,19 +89,16 @@ func main() {
panic(err)
}
- success, err := configClient.PublishConfig(vo.ConfigParam{
- DataId: "dubbo-go-samples-configcenter-nacos-go-server",
- Group: "dubbo",
- Content: configCenterNacosServerConfig,
- })
- if err != nil {
+ if err := publishAndWaitConfig(
+ configClient,
+ "dubbo-go-samples-configcenter-nacos-go-server",
+ "dubbo",
+ configCenterNacosServerConfig,
+ 10*time.Second,
+ 200*time.Millisecond,
+ ); err != nil {
panic(err)
}
- if !success {
- return
- }
-
- time.Sleep(time.Second * 10)
nacosOption := config_center.WithNacos()
dataIdOption :=
config_center.WithDataID("dubbo-go-samples-configcenter-nacos-go-server")
@@ -123,3 +123,42 @@ func main() {
logger.Error(err)
}
}
+
+func publishAndWaitConfig(
+ configClient config_client.IConfigClient,
+ dataID string,
+ group string,
+ content string,
+ timeout time.Duration,
+ pollInterval time.Duration,
+) error {
+ success, err := configClient.PublishConfig(vo.ConfigParam{
+ DataId: dataID,
+ Group: group,
+ Content: content,
+ })
+ if err != nil {
+ return err
+ }
+ if !success {
+ return fmt.Errorf("publish config failed")
+ }
+
+ deadline := time.Now().Add(timeout)
+ for {
+ current, err := configClient.GetConfig(vo.ConfigParam{
+ DataId: dataID,
+ Group: group,
+ })
+ if err == nil && strings.TrimSpace(current) ==
strings.TrimSpace(content) {
+ return nil
+ }
+ if time.Now().After(deadline) {
+ if err != nil {
+ return err
+ }
+ return fmt.Errorf("wait for config center timeout")
+ }
+ time.Sleep(pollInterval)
+ }
+}
diff --git a/config_center/zookeeper/go-client/cmd/main.go
b/config_center/zookeeper/go-client/cmd/main.go
index 188d7569..69fb2384 100644
--- a/config_center/zookeeper/go-client/cmd/main.go
+++ b/config_center/zookeeper/go-client/cmd/main.go
@@ -47,9 +47,6 @@ func main() {
}
logger.Info("Successfully wrote config to ZooKeeper")
- // wait for config write to finish
- time.Sleep(time.Second * 3)
-
// configure Dubbo instance
zkOption := config_center.WithZookeeper()
dataIdOption :=
config_center.WithDataID("dubbo-go-samples-configcenter-zookeeper-go-client")
@@ -132,9 +129,31 @@ func writeRuleToConfigCenter() error {
logger.Info("Created new config node")
}
+ if err := waitForConfigReady(c, path, valueBytes, 10*time.Second); err
!= nil {
+ return perrors.Wrap(err, "wait for config ready")
+ }
+
return nil
}
+func waitForConfigReady(c *zk.Conn, path string, expected []byte, timeout
time.Duration) error {
+ deadline := time.Now().Add(timeout)
+ expectedStr := strings.TrimSpace(string(expected))
+ for {
+ data, _, err := c.Get(path)
+ if err == nil && strings.TrimSpace(string(data)) == expectedStr
{
+ return nil
+ }
+ if time.Now().After(deadline) {
+ if err != nil {
+ return perrors.Wrap(err, "wait for config
timeout")
+ }
+ return perrors.New("wait for config timeout")
+ }
+ time.Sleep(200 * time.Millisecond)
+ }
+}
+
// helper function to create parent paths
func createParentPaths(c *zk.Conn, path string) error {
paths := strings.Split(path, "/")
diff --git a/config_center/zookeeper/go-server/cmd/main.go
b/config_center/zookeeper/go-server/cmd/main.go
index 8a52e304..c8b2b0df 100644
--- a/config_center/zookeeper/go-server/cmd/main.go
+++ b/config_center/zookeeper/go-server/cmd/main.go
@@ -57,9 +57,6 @@ func main() {
panic(err)
}
- // Wait for the configuration to take effect
- time.Sleep(time.Second * 10)
-
ins, err := dubbo.NewInstance(
dubbo.WithConfigCenter(
config_center.WithZookeeper(),
@@ -151,9 +148,31 @@ func writeRuleToConfigCenter() error {
logger.Info("Created new configuration in config center")
}
+ if err := waitForConfigReady(c, path, valueBytes, 10*time.Second); err
!= nil {
+ return perrors.Wrap(err, "wait for config ready")
+ }
+
return nil
}
+func waitForConfigReady(c *zk.Conn, path string, expected []byte, timeout
time.Duration) error {
+ deadline := time.Now().Add(timeout)
+ expectedStr := strings.TrimSpace(string(expected))
+ for {
+ data, _, err := c.Get(path)
+ if err == nil && strings.TrimSpace(string(data)) == expectedStr
{
+ return nil
+ }
+ if time.Now().After(deadline) {
+ if err != nil {
+ return perrors.Wrap(err, "wait for config
timeout")
+ }
+ return perrors.New("wait for config timeout")
+ }
+ time.Sleep(200 * time.Millisecond)
+ }
+}
+
// createParentPaths Create parent paths
func createParentPaths(c *zk.Conn, path string) error {
paths := strings.Split(path, "/")
diff --git a/filter/sentinel/go-client/cmd/main.go
b/filter/sentinel/go-client/cmd/main.go
index 18303bb0..59c88d6f 100644
--- a/filter/sentinel/go-client/cmd/main.go
+++ b/filter/sentinel/go-client/cmd/main.go
@@ -43,18 +43,43 @@ import (
type GreetFun func(ctx context.Context, req *greet.GreetRequest, opts
...client.CallOption) (*greet.GreetResponse, error)
type stateChangeTestListener struct {
+ openCh chan struct{}
+ halfOpenCh chan struct{}
+ closedCh chan struct{}
}
func (s *stateChangeTestListener) OnTransformToClosed(prev
circuitbreaker.State, rule circuitbreaker.Rule) {
logger.Infof("rule.steategy: %+v, From %s to Closed, time: %d\n",
rule.Strategy, prev.String(), util.CurrentTimeMillis())
+ s.notify(s.closedCh)
}
func (s *stateChangeTestListener) OnTransformToOpen(prev circuitbreaker.State,
rule circuitbreaker.Rule, snapshot interface{}) {
logger.Infof("rule.steategy: %+v, From %s to Open, snapshot: %.2f,
time: %d\n", rule.Strategy, prev.String(), snapshot, util.CurrentTimeMillis())
+ s.notify(s.openCh)
}
func (s *stateChangeTestListener) OnTransformToHalfOpen(prev
circuitbreaker.State, rule circuitbreaker.Rule) {
logger.Infof("rule.steategy: %+v, From %s to Half-Open, time: %d\n",
rule.Strategy, prev.String(), util.CurrentTimeMillis())
+ s.notify(s.halfOpenCh)
+}
+
+func (s *stateChangeTestListener) notify(ch chan struct{}) {
+ if ch == nil {
+ return
+ }
+ select {
+ case ch <- struct{}{}:
+ default:
+ }
+}
+
+func waitForState(ch <-chan struct{}, timeout time.Duration) bool {
+ select {
+ case <-ch:
+ return true
+ case <-time.After(timeout):
+ return false
+ }
}
func main() {
@@ -69,8 +94,13 @@ func main() {
if err != nil {
panic(err)
}
+ listener := &stateChangeTestListener{
+ openCh: make(chan struct{}, 1),
+ halfOpenCh: make(chan struct{}, 1),
+ closedCh: make(chan struct{}, 1),
+ }
// Register a state change listener so that we could observe the state
change of the internal circuit breaker.
- circuitbreaker.RegisterStateChangeListeners(&stateChangeTestListener{})
+ circuitbreaker.RegisterStateChangeListeners(listener)
_, err = circuitbreaker.LoadRules([]*circuitbreaker.Rule{
// Statistic time span=1s, recoveryTimeout=1s, maxErrorRatio=40%
{
@@ -86,6 +116,7 @@ func main() {
if err != nil {
panic(err)
}
+ retryTimeout := 2 * time.Second
_, err = isolation.LoadRules([]*isolation.Rule{
{
@@ -106,10 +137,19 @@ func main() {
logger.Info("call svc.GreetWithChanceOfError triggers circuit breaker
open")
CallGreetFunConcurrently(svc.GreetWithChanceOfError, "error", 1, 300)
- logger.Info("wait circuit breaker HalfOpen")
- time.Sleep(3 * time.Second)
+ if !waitForState(listener.openCh, 5*time.Second) {
+ logger.Warn("wait circuit breaker Open timeout")
+ }
+ logger.Info("wait circuit breaker HalfOpen window")
+ timer := time.NewTimer(retryTimeout + 200*time.Millisecond)
+ <-timer.C
CallGreetFunConcurrently(svc.GreetWithChanceOfError, "hello world", 1,
300)
- time.Sleep(10 * time.Second)
+ if !waitForState(listener.halfOpenCh, 5*time.Second) {
+ logger.Warn("wait circuit breaker HalfOpen timeout")
+ }
+ if !waitForState(listener.closedCh, 5*time.Second) {
+ logger.Warn("wait circuit breaker Closed timeout")
+ }
}
func CallGreetFunConcurrently(f GreetFun, req string, numberOfConcurrently,
frequency int) (pass int64, block int64) {
diff --git a/integrate_test/async/tests/integration/async_test.go
b/integrate_test/async/tests/integration/async_test.go
index da1f3b79..173dd414 100644
--- a/integrate_test/async/tests/integration/async_test.go
+++ b/integrate_test/async/tests/integration/async_test.go
@@ -33,10 +33,21 @@ import (
func TestAsync(t *testing.T) {
req := &user.GetUserRequest{Id: "003"}
- _, err := userProvider.GetUser(context.Background(), req)
- assert.Nil(t, err)
+ ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
+ defer cancel()
+
+ done := make(chan error, 1)
+ go func() {
+ _, err := userProvider.GetUser(ctx, req)
+ done <- err
+ }()
- time.Sleep(time.Second)
+ select {
+ case err := <-done:
+ assert.Nil(t, err)
+ case <-ctx.Done():
+ assert.Fail(t, "async call timeout", ctx.Err().Error())
+ }
}
func TestAsyncOneWay(t *testing.T) {
diff --git a/integrate_test/config_center/nacos/tests/integration/main_test.go
b/integrate_test/config_center/nacos/tests/integration/main_test.go
index 67ef0a68..887bf55e 100644
--- a/integrate_test/config_center/nacos/tests/integration/main_test.go
+++ b/integrate_test/config_center/nacos/tests/integration/main_test.go
@@ -19,6 +19,7 @@ package integration
import (
"os"
+ "strings"
"testing"
"time"
)
@@ -85,7 +86,23 @@ func TestMain(m *testing.M) {
return
}
- time.Sleep(time.Second * 10)
+ deadline := time.Now().Add(10 * time.Second)
+ for {
+ content, err := configClient.GetConfig(vo.ConfigParam{
+ DataId: "dubbo-go-samples-configcenter-nacos-client",
+ Group: "dubbo",
+ })
+ if err == nil && strings.TrimSpace(content) ==
strings.TrimSpace(configCenterNacosClientConfig) {
+ break
+ }
+ if time.Now().After(deadline) {
+ if err != nil {
+ panic(err)
+ }
+ panic("wait for config center timeout")
+ }
+ time.Sleep(200 * time.Millisecond)
+ }
nacosOption := config_center.WithNacos()
dataIdOption :=
config_center.WithDataID("dubbo-go-samples-configcenter-nacos-client")
@@ -107,6 +124,5 @@ func TestMain(m *testing.M) {
if err != nil {
panic(err)
}
- time.Sleep(3 * time.Second)
os.Exit(m.Run())
}
diff --git
a/integrate_test/config_center/zookeeper/tests/integration/main_test.go
b/integrate_test/config_center/zookeeper/tests/integration/main_test.go
index 6fa96551..b5a1dc34 100644
--- a/integrate_test/config_center/zookeeper/tests/integration/main_test.go
+++ b/integrate_test/config_center/zookeeper/tests/integration/main_test.go
@@ -87,7 +87,9 @@ func TestMain(m *testing.M) {
}
}
- time.Sleep(time.Second * 10)
+ if err := waitForConfigReady(c, path, valueBytes, 10*time.Second); err
!= nil {
+ panic(err)
+ }
zkOption := config_center.WithZookeeper()
dataIdOption :=
config_center.WithDataID("dubbo-go-samples-configcenter-zookeeper-client")
@@ -109,6 +111,23 @@ func TestMain(m *testing.M) {
if err != nil {
panic(err)
}
- time.Sleep(3 * time.Second)
os.Exit(m.Run())
}
+
+func waitForConfigReady(c *zk.Conn, path string, expected []byte, timeout
time.Duration) error {
+ deadline := time.Now().Add(timeout)
+ expectedStr := strings.TrimSpace(string(expected))
+ for {
+ data, _, err := c.Get(path)
+ if err == nil && strings.TrimSpace(string(data)) == expectedStr
{
+ return nil
+ }
+ if time.Now().After(deadline) {
+ if err != nil {
+ return perrors.Wrap(err, "wait for config
timeout")
+ }
+ return perrors.New("wait for config timeout")
+ }
+ time.Sleep(200 * time.Millisecond)
+ }
+}
diff --git a/integrate_test/filter/sentinel/tests/integration/sentinel_test.go
b/integrate_test/filter/sentinel/tests/integration/sentinel_test.go
index 9d257646..a7d96fee 100644
--- a/integrate_test/filter/sentinel/tests/integration/sentinel_test.go
+++ b/integrate_test/filter/sentinel/tests/integration/sentinel_test.go
@@ -123,42 +123,39 @@ func TestSentinelConsumerFilter_ErrorCount(t *testing.T) {
listener.OnTransformToClosedChan = make(chan struct{}, 1)
listener.OnTransformToHalfOpenChan = make(chan struct{}, 1)
circuitbreaker.RegisterStateChangeListeners(listener)
- _, err := circuitbreaker.LoadRules([]*circuitbreaker.Rule{
+ rule := &circuitbreaker.Rule{
// Statistic time span=0.9s, recoveryTimeout=0.9s,
maxErrorCount=50
- {
- Resource:
"dubbo:consumer:greet.SentinelGreetService:::GreetWithChanceOfError()",
- Strategy: circuitbreaker.ErrorCount,
- RetryTimeoutMs: 900,
- MinRequestAmount: 10,
- StatIntervalMs: 900,
- StatSlidingWindowBucketCount: 10,
- Threshold: 50,
- },
- })
+ Resource:
"dubbo:consumer:greet.SentinelGreetService:::GreetWithChanceOfError()",
+ Strategy: circuitbreaker.ErrorCount,
+ RetryTimeoutMs: 900,
+ MinRequestAmount: 10,
+ StatIntervalMs: 900,
+ StatSlidingWindowBucketCount: 10,
+ Threshold: 50,
+ }
+ _, err := circuitbreaker.LoadRules([]*circuitbreaker.Rule{rule})
assert.NoError(t, err)
pass, block :=
CallGreetFunConcurrently(greetService.GreetWithChanceOfError, "error", 1, 50)
assert.True(t, pass == 0)
assert.True(t, block == 50)
- select {
- case <-time.After(time.Second):
- t.Error()
- case <-listener.OnTransformToOpenChan:
- }
- // wait half open
- time.Sleep(time.Second)
+ waitForState(t, listener.OnTransformToOpenChan, time.Second, "open")
+ timer :=
time.NewTimer(time.Duration(rule.RetryTimeoutMs)*time.Millisecond +
200*time.Millisecond)
+ <-timer.C
pass, block =
CallGreetFunConcurrently(greetService.GreetWithChanceOfError, "hello", 1, 50)
assert.True(t, pass > 0)
assert.True(t, block < 50)
+ waitForState(t, listener.OnTransformToHalfOpenChan, time.Second,
"half-open")
+ waitForState(t, listener.OnTransformToClosedChan, time.Second, "closed")
+}
+
+func waitForState(t *testing.T, ch <-chan struct{}, timeout time.Duration,
name string) {
+ t.Helper()
select {
- case <-time.After(time.Second):
- t.Error()
- case <-listener.OnTransformToHalfOpenChan:
- }
- select {
- case <-time.After(time.Second):
- t.Error()
- case <-listener.OnTransformToClosedChan:
+ case <-ch:
+ return
+ case <-time.After(timeout):
+ t.Fatalf("wait circuit breaker %s timeout", name)
}
}