This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/develop by this push:
     new f50ba61c4 Fix: handle logical dead loop (#2879)
f50ba61c4 is described below

commit f50ba61c4ccbe8910074ef998f8069faea3bb8c5
Author: 1kasa <[email protected]>
AuthorDate: Thu May 29 15:38:27 2025 +0800

    Fix: handle logical dead loop (#2879)
    
    * handle logical dead loop
    
    * Unified naming style
    
    * Unified naming style
    
    * remove nolint
    
    * optimization
    
    * optimization
    
    * feat a concurrent closed testcase
    
    * hanndle dead loop
    
    * hanndle dead loop
    
    * hanndle wrong commit
    
    * add uint test Comment
---
 registry/nacos/listener.go      |   7 +-
 registry/nacos/registry.go      |  88 +++++++++++++--
 registry/nacos/registry_test.go | 240 +++++++++++++++++++++++++++++++++++++++-
 3 files changed, 323 insertions(+), 12 deletions(-)

diff --git a/registry/nacos/listener.go b/registry/nacos/listener.go
index 9b6048537..46d994160 100644
--- a/registry/nacos/listener.go
+++ b/registry/nacos/listener.go
@@ -59,6 +59,7 @@ type nacosListener struct {
        cacheLock      sync.Mutex
        done           chan struct{}
        subscribeParam *vo.SubscribeParam
+       once           sync.Once
 }
 
 // NewNacosListenerWithServiceName creates a data listener for nacos
@@ -214,6 +215,8 @@ func (nl *nacosListener) Next() (*registry.ServiceEvent, 
error) {
 
 // nolint
 func (nl *nacosListener) Close() {
-       _ = nl.stopListen()
-       close(nl.done)
+       nl.once.Do(func() {
+               _ = nl.stopListen()
+               close(nl.done)
+       })
 }
diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go
index 1eb0d39d0..8c891bcce 100644
--- a/registry/nacos/registry.go
+++ b/registry/nacos/registry.go
@@ -23,6 +23,7 @@ import (
        "math"
        "strconv"
        "strings"
+       "sync"
        "time"
 )
 
@@ -48,6 +49,7 @@ import (
 
 const (
        LookupInterval = 20 * time.Second
+       checkInterval  = 5 * time.Second
 )
 
 func init() {
@@ -58,6 +60,15 @@ type nacosRegistry struct {
        *common.URL
        namingClient *nacosClient.NacosNamingClient
        registryUrls []*common.URL
+       done         chan struct{}
+       availability availabilityCache
+       wg           sync.WaitGroup
+}
+
+type availabilityCache struct {
+       mu            sync.Mutex
+       lastAvailable bool
+       lastCheckTime time.Time
 }
 
 func getCategory(url *common.URL) string {
@@ -251,7 +262,11 @@ func (nr *nacosRegistry) subscribe(serviceName string, 
notifyListener registry.N
                return err
        }
        // handleServiceEvents will block to wait notify event and exit when 
error occur
-       go nr.handleServiceEvents(listener, notifyListener)
+       nr.wg.Add(1)
+       go func() {
+               defer nr.wg.Done()
+               nr.handleServiceEvents(listener, notifyListener)
+       }()
        return nil
 }
 
@@ -284,16 +299,18 @@ func (nr *nacosRegistry) getAllSubscribeServiceNames(url 
*common.URL) ([]string,
 
 // handleServiceEvents receives service events from the listener and notifies 
the notifyListener
 func (nr *nacosRegistry) handleServiceEvents(listener registry.Listener, 
notifyListener registry.NotifyListener) {
+       defer listener.Close()
        for {
-               serviceEvent, err := listener.Next()
                if !nr.IsAvailable() {
                        return
                }
+
+               serviceEvent, err := listener.Next()
                if err != nil {
                        logger.Warnf("Selector.watch() = error{%v}", 
perrors.WithStack(err))
-                       listener.Close()
                        return
                }
+
                logger.Infof("[Nacos Registry] Update begin, service event: 
%v", serviceEvent.String())
                notifyListener.Notify(serviceEvent)
        }
@@ -359,12 +376,49 @@ func (nr *nacosRegistry) GetURL() *common.URL {
 
 // IsAvailable determines nacos registry center whether it is available
 func (nr *nacosRegistry) IsAvailable() bool {
-       // TODO
-       return true
+       // Considering both local state + server state
+       select {
+       case <-nr.done:
+               return false
+       default:
+       }
+
+       ac := &nr.availability
+       ac.mu.Lock()
+       defer ac.mu.Unlock()
+
+       if time.Since(ac.lastCheckTime) < checkInterval {
+               return ac.lastAvailable
+       }
+
+       ac.lastCheckTime = time.Now()
+
+       if nr.namingClient == nil || nr.namingClient.Client() == nil {
+               ac.lastAvailable = false
+               return false
+       }
+
+       _, err := 
nr.namingClient.Client().GetAllServicesInfo(vo.GetAllServiceInfoParam{
+               GroupName: nr.GetParam(constant.RegistryGroupKey, defaultGroup),
+               PageNo:    1,
+               PageSize:  1,
+       })
+       ac.lastAvailable = err == nil
+       return ac.lastAvailable
 }
 
-// nolint
 func (nr *nacosRegistry) Destroy() {
+       nr.CloseListener()
+
+       // Prevent close() from being called multiple times, causing panic
+       select {
+       case <-nr.done:
+       default:
+               close(nr.done)
+       }
+
+       nr.wg.Wait()
+
        for _, url := range nr.registryUrls {
                err := nr.UnRegister(url)
                logger.Infof("DeRegister Nacos URL:%+v", url)
@@ -372,7 +426,9 @@ func (nr *nacosRegistry) Destroy() {
                        logger.Errorf("Deregister URL:%+v err:%v", url, 
err.Error())
                }
        }
-       return
+
+       nr.registryUrls = nil
+       nr.CloseAndNilClient()
 }
 
 // newNacosRegistry will create new instance
@@ -394,6 +450,24 @@ func newNacosRegistry(url *common.URL) (registry.Registry, 
error) {
                URL:          url, // registry.group is recorded at this url
                namingClient: namingClient,
                registryUrls: []*common.URL{},
+               done:         make(chan struct{}),
        }
        return tmpRegistry, nil
 }
+
+func (nr *nacosRegistry) CloseListener() {
+       listenerCache.Range(func(key, value any) bool {
+               if listener, ok := value.(*nacosListener); ok {
+                       listener.Close()
+               }
+               listenerCache.Delete(key)
+               return true
+       })
+}
+
+func (nr *nacosRegistry) CloseAndNilClient() {
+       if nr.namingClient != nil && nr.namingClient.Client() != nil {
+               nr.namingClient.Client().CloseClient()
+               nr.namingClient = nil
+       }
+}
diff --git a/registry/nacos/registry_test.go b/registry/nacos/registry_test.go
index 25dd41052..928d46501 100644
--- a/registry/nacos/registry_test.go
+++ b/registry/nacos/registry_test.go
@@ -21,6 +21,8 @@ import (
        "net/url"
        "reflect"
        "strconv"
+       "strings"
+       "sync"
        "testing"
 )
 
@@ -220,7 +222,7 @@ func newNacosRegistryForTest(f fields) *nacosRegistry {
        }
 }
 
-func Test_nacosRegistry_Register(t *testing.T) {
+func TestNacosRegistryRegister(t *testing.T) {
        params := url.Values{}
        params.Set(constant.RegistryRoleKey, strconv.Itoa(common.PROVIDER))
        params.Set(constant.NacosNotLoadLocalCache, "true")
@@ -272,7 +274,7 @@ func Test_nacosRegistry_Register(t *testing.T) {
        }
 }
 
-func Test_nacosRegistry_UnRegister(t *testing.T) {
+func TestNacosRegistryUnRegister(t *testing.T) {
        params := url.Values{}
        params.Set(constant.RegistryRoleKey, strconv.Itoa(common.PROVIDER))
        params.Set(constant.NacosNotLoadLocalCache, "true")
@@ -324,7 +326,7 @@ func Test_nacosRegistry_UnRegister(t *testing.T) {
        }
 }
 
-func Test_nacosRegistry_Subscribe(t *testing.T) {
+func TestNacosRegistrySubscribe(t *testing.T) {
        params := url.Values{}
        params.Set(constant.RegistryRoleKey, strconv.Itoa(common.PROVIDER))
        params.Set(constant.NacosNotLoadLocalCache, "true")
@@ -374,3 +376,235 @@ func Test_nacosRegistry_Subscribe(t *testing.T) {
                })
        }
 }
+
+// TestNacosRegistryDestroy Tests the Destroy method of NacosRegistry
+func TestNacosRegistryDestroy(t *testing.T) {
+       ctrl := gomock.NewController(t)
+       defer ctrl.Finish()
+
+       // Create a mock object
+       mockNamingClient := NewMockINamingClient(ctrl)
+       nc := &nacosClient.NacosNamingClient{}
+       nc.SetClient(mockNamingClient)
+
+       // Initialize the NacosRegistry object
+       regURL, _ := common.NewURL("registry://127.0.0.1:8848")
+       nr := &nacosRegistry{
+               URL:          regURL,
+               namingClient: nc,
+               done:         make(chan struct{}),
+               registryUrls: []*common.URL{},
+       }
+
+       //Simulate the registered service URL
+       serviceURL1, _ := 
common.NewURL("dubbo://127.0.0.1:20001/com.example.Service1?interface=com.example.Service1&group=test&version=1.0.0")
+       serviceURL2, _ := 
common.NewURL("dubbo://127.0.0.1:20002/com.example.Service2?interface=com.example.Service2&group=test&version=1.0.0")
+
+       // Add the service URL to the registry
+       nr.registryUrls = append(nr.registryUrls, serviceURL1)
+       nr.registryUrls = append(nr.registryUrls, serviceURL2)
+
+       // Add a mock listener to listenerCache
+       serviceName := "com.example.TestService"
+       nl := NewNacosListenerWithServiceName(serviceName, regURL, nc)
+       subscribeParam := &vo.SubscribeParam{
+               ServiceName:       serviceName,
+               GroupName:         "testgroup",
+               SubscribeCallback: nl.Callback,
+       }
+       nl.subscribeParam = subscribeParam
+       listenerCache.Store(serviceName+"testgroup", nl)
+
+       // Simulate unsubscribe and unregister instances
+       mockNamingClient.EXPECT().Unsubscribe(subscribeParam).Return(nil)
+       
mockNamingClient.EXPECT().DeregisterInstance(gomock.Any()).Times(len(nr.registryUrls)).Return(true,
 nil)
+
+       // Use goroutine to wait for nr.done channel to close
+       var wg sync.WaitGroup
+       wg.Add(1)
+
+       go func() {
+               defer wg.Done()
+               <-nr.done
+               t.Log("nr.done channel closed")
+       }()
+
+       nr.Destroy()
+
+       wg.Wait()
+
+       // Check if namingClient and listenerCache are cleaned up
+       if nr.namingClient != nil {
+               t.Errorf("namingClient was not set to nil")
+       }
+
+       if _, ok := listenerCache.Load(serviceName + "testgroup"); ok {
+               t.Errorf("listenerCache was not cleared")
+       }
+}
+
+func TestNacosListenerClose(t *testing.T) {
+       ctrl := gomock.NewController(t)
+       defer ctrl.Finish()
+
+       mockNamingClient := NewMockINamingClient(ctrl)
+       nc := &nacosClient.NacosNamingClient{}
+       nc.SetClient(mockNamingClient)
+
+       regURL, _ := 
common.NewURL("registry://127.0.0.1:8848?registry.group=testgroup")
+       serviceName := "com.example.TestService"
+
+       nl := NewNacosListenerWithServiceName(serviceName, regURL, nc)
+
+       subscribeParam := &vo.SubscribeParam{
+               ServiceName:       serviceName,
+               GroupName:         "testgroup",
+               SubscribeCallback: nl.Callback,
+       }
+       nl.subscribeParam = subscribeParam
+
+       // Set the `Unsubscribe` method of the expected mock object 
`mockNamingClient`. The `Unsubscribe` method should be called exactly once.
+       
mockNamingClient.EXPECT().Unsubscribe(subscribeParam).Times(1).Return(nil)
+
+       nl.Close()
+
+       // Check if the nl.done channel is closed
+       select {
+       case <-nl.done:
+       default:
+               t.Errorf("nl.done channel was not closed after Close()")
+       }
+}
+
+// TestNacosListenerNextAfterClose tests the behavior of NacosListener calling 
the Next method after Close
+func TestNacosListenerNextAfterClose(t *testing.T) {
+       ctrl := gomock.NewController(t)
+       defer ctrl.Finish()
+
+       mockNamingClient := NewMockINamingClient(ctrl)
+       nc := &nacosClient.NacosNamingClient{}
+       nc.SetClient(mockNamingClient)
+
+       regURL, _ := common.NewURL("registry://127.0.0.1:8848")
+       serviceName := "com.example.AnotherService"
+
+       nl := NewNacosListenerWithServiceName(serviceName, regURL, nc)
+       mockNamingClient.EXPECT().Unsubscribe(gomock.Any()).Times(1)
+
+       nl.Close()
+
+       // Call the Next method to verify whether an error is returned
+       event, err := nl.Next()
+
+       if err == nil {
+               t.Errorf("Expected error from Next() after Close(), but got 
nil")
+       } else {
+               expectedErrorMsg := "listener stopped"
+               if !strings.Contains(err.Error(), expectedErrorMsg) {
+                       t.Errorf("Expected error message to contain '%s', but 
got '%s'", expectedErrorMsg, err.Error())
+               }
+       }
+
+       // Check if Next returns a nil event
+       if event != nil {
+               t.Errorf("Expected nil event from Next() after Close(), but 
got: %+v", event)
+       }
+}
+
+// TestNacosListenerCloseConcurrent tests the concurrent calls to the Close 
method of NacosListener
+func TestNacosListenerCloseConcurrent(t *testing.T) {
+       ctrl := gomock.NewController(t)
+       defer ctrl.Finish()
+
+       mockNamingClient := NewMockINamingClient(ctrl)
+       nc := &nacosClient.NacosNamingClient{}
+       nc.SetClient(mockNamingClient)
+
+       regURL, _ := 
common.NewURL("registry://127.0.0.1:8848?registry.group=testgroup")
+       serviceName := "com.example.ConcurrentTestService"
+
+       nl := NewNacosListenerWithServiceName(serviceName, regURL, nc)
+       subscribeParam := &vo.SubscribeParam{
+               ServiceName:       serviceName,
+               GroupName:         "testgroup",
+               SubscribeCallback: nl.Callback,
+       }
+
+       // Set the `Unsubscribe` method of the expected mock object 
`mockNamingClient`. The `Unsubscribe` method should be called exactly once.
+       nl.subscribeParam = subscribeParam
+       
mockNamingClient.EXPECT().Unsubscribe(subscribeParam).Times(1).Return(nil)
+
+       var (
+               wg            sync.WaitGroup
+               numGoroutines = 10
+               startSignal   = make(chan struct{})
+       )
+
+       // Start multiple goroutines to test concurrent calls to the Close 
method
+       for i := 0; i < numGoroutines; i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       <-startSignal
+                       nl.Close()
+               }()
+       }
+
+       //Send a signal after starting goroutine
+       close(startSignal)
+       t.Logf("Signaled %d goroutines to start NacosListener Close", 
numGoroutines)
+
+       wg.Wait()
+
+       // Check if the nl.done channel is closed
+       select {
+       case _, ok := <-nl.done:
+               if ok {
+                       t.Errorf("nl.done channel was not closed after Close()")
+               }
+       default:
+               t.Log("nl.done channel was closed after Close() as expected")
+       }
+       t.Logf("NacosListener Close call completed successfully")
+}
+
+// TestNacosRegistryCloseListener tests the CloseListener method of 
NacosRegistry
+func TestNacosRegistryCloseListener(t *testing.T) {
+       ctrl := gomock.NewController(t)
+       defer ctrl.Finish()
+
+       mockNamingClient := NewMockINamingClient(ctrl)
+       nc := &nacosClient.NacosNamingClient{}
+       nc.SetClient(mockNamingClient)
+
+       regURL, _ := 
common.NewURL("registry://127.0.0.1:8848?registry.group=testgroup")
+       serviceName := "com.example.TestService"
+
+       //Simulate the behavior of registering and closing listeners
+       nl := NewNacosListenerWithServiceName(serviceName, regURL, nc)
+       subscribeParam := &vo.SubscribeParam{
+               ServiceName:       serviceName,
+               GroupName:         "testgroup",
+               SubscribeCallback: nl.Callback,
+       }
+       nl.subscribeParam = subscribeParam
+       listenerCache.Store(serviceName+"testgroup", nl)
+
+       mockNamingClient.EXPECT().Unsubscribe(subscribeParam).Return(nil)
+
+       // Call the CloseListener method
+       nr := &nacosRegistry{URL: regURL, namingClient: nc}
+       nr.CloseListener()
+
+       // Verify whether to clear the entries in the listenerCache
+       if _, ok := listenerCache.Load(serviceName + "testgroup"); ok {
+               t.Errorf("listenerCache was not cleared")
+       }
+
+       select {
+       case <-nl.done:
+               t.Log("nl.done channel closed successfully")
+       default:
+               t.Errorf("nl.done channel was not closed")
+       }
+}

Reply via email to