This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/develop by this push:
new 842281de1 fix(registry/nacos): add exponential backoff for subscribe
retry (#3178)
842281de1 is described below
commit 842281de12d56b428d9adb5c6a2e806a72be7d98
Author: CAICAII <[email protected]>
AuthorDate: Tue Jan 27 16:16:38 2026 +0800
fix(registry/nacos): add exponential backoff for subscribe retry (#3178)
---
registry/nacos/registry.go | 32 +++++++++++++++++++++++++-------
registry/nacos/registry_test.go | 30 ++++++++++++++++++++++++++++++
2 files changed, 55 insertions(+), 7 deletions(-)
diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go
index 8c891bcce..b4d59ae17 100644
--- a/registry/nacos/registry.go
+++ b/registry/nacos/registry.go
@@ -19,6 +19,7 @@ package nacos
import (
"bytes"
+ "context"
"fmt"
"math"
"strconv"
@@ -28,6 +29,8 @@ import (
)
import (
+ "github.com/cenkalti/backoff/v4"
+
nacosClient "github.com/dubbogo/gost/database/kv/nacos"
"github.com/dubbogo/gost/log/logger"
@@ -200,15 +203,30 @@ func (nr *nacosRegistry) Subscribe(url *common.URL,
notifyListener registry.Noti
}
func (nr *nacosRegistry) subscribeUntilSuccess(url *common.URL, notifyListener
registry.NotifyListener) {
- // retry forever
- for {
+ ctx, cancel := context.WithCancel(context.Background())
+ go func() {
+ <-nr.done
+ cancel()
+ }()
+
+ bo := backoff.NewExponentialBackOff()
+ bo.InitialInterval = 1 * time.Second
+ bo.MaxInterval = 30 * time.Second
+ bo.MaxElapsedTime = 0
+
+ operation := func() error {
if !nr.IsAvailable() {
- return
- }
- err := nr.subscribe(getSubscribeName(url), notifyListener)
- if err == nil {
- return
+ return backoff.Permanent(perrors.New("registry
unavailable"))
}
+ return nr.subscribe(getSubscribeName(url), notifyListener)
+ }
+
+ notify := func(err error, duration time.Duration) {
+ logger.Warnf("[Nacos Registry] subscribe failed, retry after
%v: %v", duration, err)
+ }
+
+ if err := backoff.RetryNotify(operation, backoff.WithContext(bo, ctx),
notify); err != nil {
+ logger.Infof("[Nacos Registry] subscribe retry stopped: %v",
err)
}
}
diff --git a/registry/nacos/registry_test.go b/registry/nacos/registry_test.go
index 9c00c6232..e0b3031c4 100644
--- a/registry/nacos/registry_test.go
+++ b/registry/nacos/registry_test.go
@@ -620,3 +620,33 @@ func TestNacosRegistryCloseListener(t *testing.T) {
t.Errorf("nl.done channel was not closed")
}
}
+
+// TestNacosRegistrySubscribeUntilSuccessWithBackoff tests the exponential
backoff retry mechanism
+func TestNacosRegistrySubscribeUntilSuccessWithBackoff(t *testing.T) {
+ ctrl := gomock.NewController(t)
+ defer ctrl.Finish()
+
+ mockNamingClient := NewMockINamingClient(ctrl)
+ nc := &nacosClient.NacosNamingClient{}
+ nc.SetClient(mockNamingClient)
+
+ regURL, _ :=
common.NewURL("registry://127.0.0.1:8848?registry.group=testgroup")
+ nr := &nacosRegistry{
+ URL: regURL,
+ namingClient: nc,
+ done: make(chan struct{}),
+ registryUrls: []*common.URL{},
+ }
+
+ urlMap := url.Values{}
+ urlMap.Set(constant.RegistryRoleKey, strconv.Itoa(common.CONSUMER))
+ urlMap.Set(constant.InterfaceKey, "com.test.BackoffService")
+ testURL, _ :=
common.NewURL("dubbo://127.0.0.1:20000/com.test.BackoffService",
common.WithParams(urlMap))
+
+ // Test case: registry becomes unavailable, should stop retrying
immediately
+ t.Run("stops when registry unavailable", func(t *testing.T) {
+ close(nr.done) // simulate registry shutdown
+ nr.subscribeUntilSuccess(testURL, nil)
+ // If we reach here without hanging, the test passes
+ })
+}