This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/develop by this push:
     new ff45d605a fix: when nacos subscribe all service names can not get new 
add sevice (#2859)
ff45d605a is described below

commit ff45d605a8e4694352e6f27a5dc4f340b2838e14
Author: foghost <[email protected]>
AuthorDate: Sun May 11 14:14:27 2025 +0800

    fix: when nacos subscribe all service names can not get new add sevice 
(#2859)
    
    * fix bug: when nacos subscribe all service names can not get new add 
service
    
    * optimize
    
    * optimize
    
    * Update registry.go
    
    ---------
    
    Co-authored-by: Xin.Zh <[email protected]>
---
 registry/nacos/listener.go |   3 +-
 registry/nacos/registry.go | 103 ++++++++++++++++++++++++++++++---------------
 2 files changed, 72 insertions(+), 34 deletions(-)

diff --git a/registry/nacos/listener.go b/registry/nacos/listener.go
index 0507332da..9b6048537 100644
--- a/registry/nacos/listener.go
+++ b/registry/nacos/listener.go
@@ -176,7 +176,8 @@ func (nl *nacosListener) listenService(serviceName string) 
error {
        if nl.namingClient == nil {
                return perrors.New("nacos naming namingClient stopped")
        }
-       nl.subscribeParam = createSubscribeParam(serviceName, nl.regURL, 
nl.Callback)
+       group := nl.regURL.GetParam(constant.RegistryGroupKey, defaultGroup)
+       nl.subscribeParam = createSubscribeParam(serviceName, group, 
nl.Callback)
        if nl.subscribeParam == nil {
                return perrors.New("create nacos subscribeParam failed")
        }
diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go
index d482c063b..7f5ec246b 100644
--- a/registry/nacos/registry.go
+++ b/registry/nacos/registry.go
@@ -47,7 +47,7 @@ import (
 )
 
 const (
-       RegistryConnDelay = 3
+       LookupInterval = 20 * time.Second
 )
 
 func init() {
@@ -177,46 +177,81 @@ func (nr *nacosRegistry) Subscribe(url *common.URL, 
notifyListener registry.Noti
                return nil
        }
        serviceName := url.GetParam(constant.InterfaceKey, "")
-       var serviceNames []string
-       var err error
        if serviceName == constant.AnyValue {
-               serviceNames, err = nr.getAllSubscribeServiceNames(url)
-               if err != nil {
-                       return err
-               }
+               // sync subscribe all first
+               nr.subscribeAll(url, notifyListener)
+               // scheduled lookup for new service
+               go nr.scheduledLookUp(url, notifyListener)
        } else {
-               serviceNames = []string{getSubscribeName(url)}
+               nr.subscribeUntilSuccess(url, notifyListener)
        }
-       return nr.subscribe(serviceNames, notifyListener)
+       return nil
 }
 
-// subscribe subscribe services
-func (nr *nacosRegistry) subscribe(serviceNames []string, notifyListener 
registry.NotifyListener) error {
-       if len(serviceNames) == 0 {
-               logger.Warnf("No services to listen to.")
-               return nil
-       }
+func (nr *nacosRegistry) subscribeUntilSuccess(url *common.URL, notifyListener 
registry.NotifyListener) {
+       // retry forever
        for {
                if !nr.IsAvailable() {
-                       logger.Warnf("event listener game over.")
-                       return perrors.New("nacosRegistry is not available.")
-               }
-               var err error
-               for _, serviceName := range serviceNames {
-                       listener := 
NewNacosListenerWithServiceName(serviceName, nr.URL, nr.namingClient)
-                       err = listener.listenService(serviceName)
-                       metrics.Publish(metricsRegistry.NewSubscribeEvent(err 
== nil))
-                       if err != nil {
-                               logger.Warnf("getAllServices() = err:%v", 
perrors.WithStack(err))
-                               time.Sleep(time.Duration(RegistryConnDelay) * 
time.Second)
-                               break
-                       }
-                       go nr.handleServiceEvents(listener, notifyListener)
+                       return
                }
+               err := nr.subscribe(getSubscribeName(url), notifyListener)
                if err == nil {
-                       break
+                       return
+               }
+       }
+}
+
+func (nr *nacosRegistry) scheduledLookUp(url *common.URL, notifyListener 
registry.NotifyListener) {
+       for nr.IsAvailable() {
+               nr.subscribeAll(url, notifyListener)
+               time.Sleep(LookupInterval)
+       }
+}
+
+func (nr *nacosRegistry) subscribeAll(url *common.URL, notifyListener 
registry.NotifyListener) {
+       groupName := nr.URL.GetParam(constant.RegistryGroupKey, defaultGroup)
+       serviceNames, err := nr.getAllSubscribeServiceNames(url)
+       if err != nil {
+               logger.Warnf("getAllServices() = err:%v", 
perrors.WithStack(err))
+               return
+       }
+       if len(serviceNames) == 0 {
+               logger.Warnf("No services to listen to.")
+               return
+       }
+       for _, name := range serviceNames {
+               if _, ok := listenerCache.Load(name + groupName); ok {
+                       // has subscribed ,ignore
+                       continue
+               }
+               // new service
+               err = nr.subscribe(name, notifyListener)
+               if err != nil {
+                       logger.Warnf("subscribe service %s err:%v", name, 
perrors.WithStack(err))
                }
        }
+}
+
+// subscribe subscribe services
+func (nr *nacosRegistry) subscribe(serviceName string, notifyListener 
registry.NotifyListener) error {
+       if len(serviceName) == 0 {
+               logger.Warnf("can not subscribe because service name is empty")
+               return nil
+       }
+       if !nr.IsAvailable() {
+               logger.Warnf("event listener game over.")
+               return perrors.New("nacosRegistry is not available.")
+       }
+       listener := NewNacosListenerWithServiceName(serviceName, nr.URL, 
nr.namingClient)
+       // will add to listenerCache when subscribe success
+       err := listener.listenService(serviceName)
+       metrics.Publish(metricsRegistry.NewSubscribeEvent(err == nil))
+       if err != nil {
+               logger.Warnf("subscribe service %s err:%v", serviceName, 
perrors.WithStack(err))
+               return err
+       }
+       // handleServiceEvents will block to wait notify event and exit when 
error occur
+       go nr.handleServiceEvents(listener, notifyListener)
        return nil
 }
 
@@ -251,6 +286,9 @@ func (nr *nacosRegistry) getAllSubscribeServiceNames(url 
*common.URL) ([]string,
 func (nr *nacosRegistry) handleServiceEvents(listener registry.Listener, 
notifyListener registry.NotifyListener) {
        for {
                serviceEvent, err := listener.Next()
+               if !nr.IsAvailable() {
+                       return
+               }
                if err != nil {
                        logger.Warnf("Selector.watch() = error{%v}", 
perrors.WithStack(err))
                        listener.Close()
@@ -263,7 +301,7 @@ func (nr *nacosRegistry) handleServiceEvents(listener 
registry.Listener, notifyL
 
 // UnSubscribe :
 func (nr *nacosRegistry) UnSubscribe(url *common.URL, _ 
registry.NotifyListener) error {
-       param := createSubscribeParam(getSubscribeName(url), nr.URL, nil)
+       param := createSubscribeParam(getSubscribeName(url), 
nr.URL.GetParam(constant.RegistryGroupKey, defaultGroup), nil)
        if param == nil {
                return nil
        }
@@ -295,8 +333,7 @@ func (nr *nacosRegistry) LoadSubscribeInstances(url 
*common.URL, notify registry
        return nil
 }
 
-func createSubscribeParam(serviceName string, regUrl *common.URL, cb callback) 
*vo.SubscribeParam {
-       groupName := regUrl.GetParam(constant.RegistryGroupKey, defaultGroup)
+func createSubscribeParam(serviceName string, groupName string, cb callback) 
*vo.SubscribeParam {
        if cb == nil {
                v, ok := listenerCache.Load(serviceName + groupName)
                if !ok {

Reply via email to