This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/develop by this push:
new ff45d605a fix: when nacos subscribe all service names can not get new
add sevice (#2859)
ff45d605a is described below
commit ff45d605a8e4694352e6f27a5dc4f340b2838e14
Author: foghost <[email protected]>
AuthorDate: Sun May 11 14:14:27 2025 +0800
fix: when nacos subscribe all service names can not get new add sevice
(#2859)
* fix bug: when nacos subscribe all service names can not get new add
service
* optimize
* optimize
* Update registry.go
---------
Co-authored-by: Xin.Zh <[email protected]>
---
registry/nacos/listener.go | 3 +-
registry/nacos/registry.go | 103 ++++++++++++++++++++++++++++++---------------
2 files changed, 72 insertions(+), 34 deletions(-)
diff --git a/registry/nacos/listener.go b/registry/nacos/listener.go
index 0507332da..9b6048537 100644
--- a/registry/nacos/listener.go
+++ b/registry/nacos/listener.go
@@ -176,7 +176,8 @@ func (nl *nacosListener) listenService(serviceName string)
error {
if nl.namingClient == nil {
return perrors.New("nacos naming namingClient stopped")
}
- nl.subscribeParam = createSubscribeParam(serviceName, nl.regURL,
nl.Callback)
+ group := nl.regURL.GetParam(constant.RegistryGroupKey, defaultGroup)
+ nl.subscribeParam = createSubscribeParam(serviceName, group,
nl.Callback)
if nl.subscribeParam == nil {
return perrors.New("create nacos subscribeParam failed")
}
diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go
index d482c063b..7f5ec246b 100644
--- a/registry/nacos/registry.go
+++ b/registry/nacos/registry.go
@@ -47,7 +47,7 @@ import (
)
const (
- RegistryConnDelay = 3
+ LookupInterval = 20 * time.Second
)
func init() {
@@ -177,46 +177,81 @@ func (nr *nacosRegistry) Subscribe(url *common.URL,
notifyListener registry.Noti
return nil
}
serviceName := url.GetParam(constant.InterfaceKey, "")
- var serviceNames []string
- var err error
if serviceName == constant.AnyValue {
- serviceNames, err = nr.getAllSubscribeServiceNames(url)
- if err != nil {
- return err
- }
+ // sync subscribe all first
+ nr.subscribeAll(url, notifyListener)
+ // scheduled lookup for new service
+ go nr.scheduledLookUp(url, notifyListener)
} else {
- serviceNames = []string{getSubscribeName(url)}
+ nr.subscribeUntilSuccess(url, notifyListener)
}
- return nr.subscribe(serviceNames, notifyListener)
+ return nil
}
-// subscribe subscribe services
-func (nr *nacosRegistry) subscribe(serviceNames []string, notifyListener
registry.NotifyListener) error {
- if len(serviceNames) == 0 {
- logger.Warnf("No services to listen to.")
- return nil
- }
+func (nr *nacosRegistry) subscribeUntilSuccess(url *common.URL, notifyListener
registry.NotifyListener) {
+ // retry forever
for {
if !nr.IsAvailable() {
- logger.Warnf("event listener game over.")
- return perrors.New("nacosRegistry is not available.")
- }
- var err error
- for _, serviceName := range serviceNames {
- listener :=
NewNacosListenerWithServiceName(serviceName, nr.URL, nr.namingClient)
- err = listener.listenService(serviceName)
- metrics.Publish(metricsRegistry.NewSubscribeEvent(err
== nil))
- if err != nil {
- logger.Warnf("getAllServices() = err:%v",
perrors.WithStack(err))
- time.Sleep(time.Duration(RegistryConnDelay) *
time.Second)
- break
- }
- go nr.handleServiceEvents(listener, notifyListener)
+ return
}
+ err := nr.subscribe(getSubscribeName(url), notifyListener)
if err == nil {
- break
+ return
+ }
+ }
+}
+
+func (nr *nacosRegistry) scheduledLookUp(url *common.URL, notifyListener
registry.NotifyListener) {
+ for nr.IsAvailable() {
+ nr.subscribeAll(url, notifyListener)
+ time.Sleep(LookupInterval)
+ }
+}
+
+func (nr *nacosRegistry) subscribeAll(url *common.URL, notifyListener
registry.NotifyListener) {
+ groupName := nr.URL.GetParam(constant.RegistryGroupKey, defaultGroup)
+ serviceNames, err := nr.getAllSubscribeServiceNames(url)
+ if err != nil {
+ logger.Warnf("getAllServices() = err:%v",
perrors.WithStack(err))
+ return
+ }
+ if len(serviceNames) == 0 {
+ logger.Warnf("No services to listen to.")
+ return
+ }
+ for _, name := range serviceNames {
+ if _, ok := listenerCache.Load(name + groupName); ok {
+ // has subscribed ,ignore
+ continue
+ }
+ // new service
+ err = nr.subscribe(name, notifyListener)
+ if err != nil {
+ logger.Warnf("subscribe service %s err:%v", name,
perrors.WithStack(err))
}
}
+}
+
+// subscribe subscribe services
+func (nr *nacosRegistry) subscribe(serviceName string, notifyListener
registry.NotifyListener) error {
+ if len(serviceName) == 0 {
+ logger.Warnf("can not subscribe because service name is empty")
+ return nil
+ }
+ if !nr.IsAvailable() {
+ logger.Warnf("event listener game over.")
+ return perrors.New("nacosRegistry is not available.")
+ }
+ listener := NewNacosListenerWithServiceName(serviceName, nr.URL,
nr.namingClient)
+ // will add to listenerCache when subscribe success
+ err := listener.listenService(serviceName)
+ metrics.Publish(metricsRegistry.NewSubscribeEvent(err == nil))
+ if err != nil {
+ logger.Warnf("subscribe service %s err:%v", serviceName,
perrors.WithStack(err))
+ return err
+ }
+ // handleServiceEvents will block to wait notify event and exit when
error occur
+ go nr.handleServiceEvents(listener, notifyListener)
return nil
}
@@ -251,6 +286,9 @@ func (nr *nacosRegistry) getAllSubscribeServiceNames(url
*common.URL) ([]string,
func (nr *nacosRegistry) handleServiceEvents(listener registry.Listener,
notifyListener registry.NotifyListener) {
for {
serviceEvent, err := listener.Next()
+ if !nr.IsAvailable() {
+ return
+ }
if err != nil {
logger.Warnf("Selector.watch() = error{%v}",
perrors.WithStack(err))
listener.Close()
@@ -263,7 +301,7 @@ func (nr *nacosRegistry) handleServiceEvents(listener
registry.Listener, notifyL
// UnSubscribe :
func (nr *nacosRegistry) UnSubscribe(url *common.URL, _
registry.NotifyListener) error {
- param := createSubscribeParam(getSubscribeName(url), nr.URL, nil)
+ param := createSubscribeParam(getSubscribeName(url),
nr.URL.GetParam(constant.RegistryGroupKey, defaultGroup), nil)
if param == nil {
return nil
}
@@ -295,8 +333,7 @@ func (nr *nacosRegistry) LoadSubscribeInstances(url
*common.URL, notify registry
return nil
}
-func createSubscribeParam(serviceName string, regUrl *common.URL, cb callback)
*vo.SubscribeParam {
- groupName := regUrl.GetParam(constant.RegistryGroupKey, defaultGroup)
+func createSubscribeParam(serviceName string, groupName string, cb callback)
*vo.SubscribeParam {
if cb == nil {
v, ok := listenerCache.Load(serviceName + groupName)
if !ok {