This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/develop by this push:
new f50ba61c4 Fix: handle logical dead loop (#2879)
f50ba61c4 is described below
commit f50ba61c4ccbe8910074ef998f8069faea3bb8c5
Author: 1kasa <[email protected]>
AuthorDate: Thu May 29 15:38:27 2025 +0800
Fix: handle logical dead loop (#2879)
* handle logical dead loop
* Unified naming style
* Unified naming style
* remove nolint
* optimization
* optimization
* feat a concurrent closed testcase
* hanndle dead loop
* hanndle dead loop
* hanndle wrong commit
* add uint test Comment
---
registry/nacos/listener.go | 7 +-
registry/nacos/registry.go | 88 +++++++++++++--
registry/nacos/registry_test.go | 240 +++++++++++++++++++++++++++++++++++++++-
3 files changed, 323 insertions(+), 12 deletions(-)
diff --git a/registry/nacos/listener.go b/registry/nacos/listener.go
index 9b6048537..46d994160 100644
--- a/registry/nacos/listener.go
+++ b/registry/nacos/listener.go
@@ -59,6 +59,7 @@ type nacosListener struct {
cacheLock sync.Mutex
done chan struct{}
subscribeParam *vo.SubscribeParam
+ once sync.Once
}
// NewNacosListenerWithServiceName creates a data listener for nacos
@@ -214,6 +215,8 @@ func (nl *nacosListener) Next() (*registry.ServiceEvent,
error) {
// nolint
func (nl *nacosListener) Close() {
- _ = nl.stopListen()
- close(nl.done)
+ nl.once.Do(func() {
+ _ = nl.stopListen()
+ close(nl.done)
+ })
}
diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go
index 1eb0d39d0..8c891bcce 100644
--- a/registry/nacos/registry.go
+++ b/registry/nacos/registry.go
@@ -23,6 +23,7 @@ import (
"math"
"strconv"
"strings"
+ "sync"
"time"
)
@@ -48,6 +49,7 @@ import (
const (
LookupInterval = 20 * time.Second
+ checkInterval = 5 * time.Second
)
func init() {
@@ -58,6 +60,15 @@ type nacosRegistry struct {
*common.URL
namingClient *nacosClient.NacosNamingClient
registryUrls []*common.URL
+ done chan struct{}
+ availability availabilityCache
+ wg sync.WaitGroup
+}
+
+type availabilityCache struct {
+ mu sync.Mutex
+ lastAvailable bool
+ lastCheckTime time.Time
}
func getCategory(url *common.URL) string {
@@ -251,7 +262,11 @@ func (nr *nacosRegistry) subscribe(serviceName string,
notifyListener registry.N
return err
}
// handleServiceEvents will block to wait notify event and exit when
error occur
- go nr.handleServiceEvents(listener, notifyListener)
+ nr.wg.Add(1)
+ go func() {
+ defer nr.wg.Done()
+ nr.handleServiceEvents(listener, notifyListener)
+ }()
return nil
}
@@ -284,16 +299,18 @@ func (nr *nacosRegistry) getAllSubscribeServiceNames(url
*common.URL) ([]string,
// handleServiceEvents receives service events from the listener and notifies
the notifyListener
func (nr *nacosRegistry) handleServiceEvents(listener registry.Listener,
notifyListener registry.NotifyListener) {
+ defer listener.Close()
for {
- serviceEvent, err := listener.Next()
if !nr.IsAvailable() {
return
}
+
+ serviceEvent, err := listener.Next()
if err != nil {
logger.Warnf("Selector.watch() = error{%v}",
perrors.WithStack(err))
- listener.Close()
return
}
+
logger.Infof("[Nacos Registry] Update begin, service event:
%v", serviceEvent.String())
notifyListener.Notify(serviceEvent)
}
@@ -359,12 +376,49 @@ func (nr *nacosRegistry) GetURL() *common.URL {
// IsAvailable determines nacos registry center whether it is available
func (nr *nacosRegistry) IsAvailable() bool {
- // TODO
- return true
+ // Considering both local state + server state
+ select {
+ case <-nr.done:
+ return false
+ default:
+ }
+
+ ac := &nr.availability
+ ac.mu.Lock()
+ defer ac.mu.Unlock()
+
+ if time.Since(ac.lastCheckTime) < checkInterval {
+ return ac.lastAvailable
+ }
+
+ ac.lastCheckTime = time.Now()
+
+ if nr.namingClient == nil || nr.namingClient.Client() == nil {
+ ac.lastAvailable = false
+ return false
+ }
+
+ _, err :=
nr.namingClient.Client().GetAllServicesInfo(vo.GetAllServiceInfoParam{
+ GroupName: nr.GetParam(constant.RegistryGroupKey, defaultGroup),
+ PageNo: 1,
+ PageSize: 1,
+ })
+ ac.lastAvailable = err == nil
+ return ac.lastAvailable
}
-// nolint
func (nr *nacosRegistry) Destroy() {
+ nr.CloseListener()
+
+ // Prevent close() from being called multiple times, causing panic
+ select {
+ case <-nr.done:
+ default:
+ close(nr.done)
+ }
+
+ nr.wg.Wait()
+
for _, url := range nr.registryUrls {
err := nr.UnRegister(url)
logger.Infof("DeRegister Nacos URL:%+v", url)
@@ -372,7 +426,9 @@ func (nr *nacosRegistry) Destroy() {
logger.Errorf("Deregister URL:%+v err:%v", url,
err.Error())
}
}
- return
+
+ nr.registryUrls = nil
+ nr.CloseAndNilClient()
}
// newNacosRegistry will create new instance
@@ -394,6 +450,24 @@ func newNacosRegistry(url *common.URL) (registry.Registry,
error) {
URL: url, // registry.group is recorded at this url
namingClient: namingClient,
registryUrls: []*common.URL{},
+ done: make(chan struct{}),
}
return tmpRegistry, nil
}
+
+func (nr *nacosRegistry) CloseListener() {
+ listenerCache.Range(func(key, value any) bool {
+ if listener, ok := value.(*nacosListener); ok {
+ listener.Close()
+ }
+ listenerCache.Delete(key)
+ return true
+ })
+}
+
+func (nr *nacosRegistry) CloseAndNilClient() {
+ if nr.namingClient != nil && nr.namingClient.Client() != nil {
+ nr.namingClient.Client().CloseClient()
+ nr.namingClient = nil
+ }
+}
diff --git a/registry/nacos/registry_test.go b/registry/nacos/registry_test.go
index 25dd41052..928d46501 100644
--- a/registry/nacos/registry_test.go
+++ b/registry/nacos/registry_test.go
@@ -21,6 +21,8 @@ import (
"net/url"
"reflect"
"strconv"
+ "strings"
+ "sync"
"testing"
)
@@ -220,7 +222,7 @@ func newNacosRegistryForTest(f fields) *nacosRegistry {
}
}
-func Test_nacosRegistry_Register(t *testing.T) {
+func TestNacosRegistryRegister(t *testing.T) {
params := url.Values{}
params.Set(constant.RegistryRoleKey, strconv.Itoa(common.PROVIDER))
params.Set(constant.NacosNotLoadLocalCache, "true")
@@ -272,7 +274,7 @@ func Test_nacosRegistry_Register(t *testing.T) {
}
}
-func Test_nacosRegistry_UnRegister(t *testing.T) {
+func TestNacosRegistryUnRegister(t *testing.T) {
params := url.Values{}
params.Set(constant.RegistryRoleKey, strconv.Itoa(common.PROVIDER))
params.Set(constant.NacosNotLoadLocalCache, "true")
@@ -324,7 +326,7 @@ func Test_nacosRegistry_UnRegister(t *testing.T) {
}
}
-func Test_nacosRegistry_Subscribe(t *testing.T) {
+func TestNacosRegistrySubscribe(t *testing.T) {
params := url.Values{}
params.Set(constant.RegistryRoleKey, strconv.Itoa(common.PROVIDER))
params.Set(constant.NacosNotLoadLocalCache, "true")
@@ -374,3 +376,235 @@ func Test_nacosRegistry_Subscribe(t *testing.T) {
})
}
}
+
+// TestNacosRegistryDestroy Tests the Destroy method of NacosRegistry
+func TestNacosRegistryDestroy(t *testing.T) {
+ ctrl := gomock.NewController(t)
+ defer ctrl.Finish()
+
+ // Create a mock object
+ mockNamingClient := NewMockINamingClient(ctrl)
+ nc := &nacosClient.NacosNamingClient{}
+ nc.SetClient(mockNamingClient)
+
+ // Initialize the NacosRegistry object
+ regURL, _ := common.NewURL("registry://127.0.0.1:8848")
+ nr := &nacosRegistry{
+ URL: regURL,
+ namingClient: nc,
+ done: make(chan struct{}),
+ registryUrls: []*common.URL{},
+ }
+
+ //Simulate the registered service URL
+ serviceURL1, _ :=
common.NewURL("dubbo://127.0.0.1:20001/com.example.Service1?interface=com.example.Service1&group=test&version=1.0.0")
+ serviceURL2, _ :=
common.NewURL("dubbo://127.0.0.1:20002/com.example.Service2?interface=com.example.Service2&group=test&version=1.0.0")
+
+ // Add the service URL to the registry
+ nr.registryUrls = append(nr.registryUrls, serviceURL1)
+ nr.registryUrls = append(nr.registryUrls, serviceURL2)
+
+ // Add a mock listener to listenerCache
+ serviceName := "com.example.TestService"
+ nl := NewNacosListenerWithServiceName(serviceName, regURL, nc)
+ subscribeParam := &vo.SubscribeParam{
+ ServiceName: serviceName,
+ GroupName: "testgroup",
+ SubscribeCallback: nl.Callback,
+ }
+ nl.subscribeParam = subscribeParam
+ listenerCache.Store(serviceName+"testgroup", nl)
+
+ // Simulate unsubscribe and unregister instances
+ mockNamingClient.EXPECT().Unsubscribe(subscribeParam).Return(nil)
+
mockNamingClient.EXPECT().DeregisterInstance(gomock.Any()).Times(len(nr.registryUrls)).Return(true,
nil)
+
+ // Use goroutine to wait for nr.done channel to close
+ var wg sync.WaitGroup
+ wg.Add(1)
+
+ go func() {
+ defer wg.Done()
+ <-nr.done
+ t.Log("nr.done channel closed")
+ }()
+
+ nr.Destroy()
+
+ wg.Wait()
+
+ // Check if namingClient and listenerCache are cleaned up
+ if nr.namingClient != nil {
+ t.Errorf("namingClient was not set to nil")
+ }
+
+ if _, ok := listenerCache.Load(serviceName + "testgroup"); ok {
+ t.Errorf("listenerCache was not cleared")
+ }
+}
+
+func TestNacosListenerClose(t *testing.T) {
+ ctrl := gomock.NewController(t)
+ defer ctrl.Finish()
+
+ mockNamingClient := NewMockINamingClient(ctrl)
+ nc := &nacosClient.NacosNamingClient{}
+ nc.SetClient(mockNamingClient)
+
+ regURL, _ :=
common.NewURL("registry://127.0.0.1:8848?registry.group=testgroup")
+ serviceName := "com.example.TestService"
+
+ nl := NewNacosListenerWithServiceName(serviceName, regURL, nc)
+
+ subscribeParam := &vo.SubscribeParam{
+ ServiceName: serviceName,
+ GroupName: "testgroup",
+ SubscribeCallback: nl.Callback,
+ }
+ nl.subscribeParam = subscribeParam
+
+ // Set the `Unsubscribe` method of the expected mock object
`mockNamingClient`. The `Unsubscribe` method should be called exactly once.
+
mockNamingClient.EXPECT().Unsubscribe(subscribeParam).Times(1).Return(nil)
+
+ nl.Close()
+
+ // Check if the nl.done channel is closed
+ select {
+ case <-nl.done:
+ default:
+ t.Errorf("nl.done channel was not closed after Close()")
+ }
+}
+
+// TestNacosListenerNextAfterClose tests the behavior of NacosListener calling
the Next method after Close
+func TestNacosListenerNextAfterClose(t *testing.T) {
+ ctrl := gomock.NewController(t)
+ defer ctrl.Finish()
+
+ mockNamingClient := NewMockINamingClient(ctrl)
+ nc := &nacosClient.NacosNamingClient{}
+ nc.SetClient(mockNamingClient)
+
+ regURL, _ := common.NewURL("registry://127.0.0.1:8848")
+ serviceName := "com.example.AnotherService"
+
+ nl := NewNacosListenerWithServiceName(serviceName, regURL, nc)
+ mockNamingClient.EXPECT().Unsubscribe(gomock.Any()).Times(1)
+
+ nl.Close()
+
+ // Call the Next method to verify whether an error is returned
+ event, err := nl.Next()
+
+ if err == nil {
+ t.Errorf("Expected error from Next() after Close(), but got
nil")
+ } else {
+ expectedErrorMsg := "listener stopped"
+ if !strings.Contains(err.Error(), expectedErrorMsg) {
+ t.Errorf("Expected error message to contain '%s', but
got '%s'", expectedErrorMsg, err.Error())
+ }
+ }
+
+ // Check if Next returns a nil event
+ if event != nil {
+ t.Errorf("Expected nil event from Next() after Close(), but
got: %+v", event)
+ }
+}
+
+// TestNacosListenerCloseConcurrent tests the concurrent calls to the Close
method of NacosListener
+func TestNacosListenerCloseConcurrent(t *testing.T) {
+ ctrl := gomock.NewController(t)
+ defer ctrl.Finish()
+
+ mockNamingClient := NewMockINamingClient(ctrl)
+ nc := &nacosClient.NacosNamingClient{}
+ nc.SetClient(mockNamingClient)
+
+ regURL, _ :=
common.NewURL("registry://127.0.0.1:8848?registry.group=testgroup")
+ serviceName := "com.example.ConcurrentTestService"
+
+ nl := NewNacosListenerWithServiceName(serviceName, regURL, nc)
+ subscribeParam := &vo.SubscribeParam{
+ ServiceName: serviceName,
+ GroupName: "testgroup",
+ SubscribeCallback: nl.Callback,
+ }
+
+ // Set the `Unsubscribe` method of the expected mock object
`mockNamingClient`. The `Unsubscribe` method should be called exactly once.
+ nl.subscribeParam = subscribeParam
+
mockNamingClient.EXPECT().Unsubscribe(subscribeParam).Times(1).Return(nil)
+
+ var (
+ wg sync.WaitGroup
+ numGoroutines = 10
+ startSignal = make(chan struct{})
+ )
+
+ // Start multiple goroutines to test concurrent calls to the Close
method
+ for i := 0; i < numGoroutines; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ <-startSignal
+ nl.Close()
+ }()
+ }
+
+ //Send a signal after starting goroutine
+ close(startSignal)
+ t.Logf("Signaled %d goroutines to start NacosListener Close",
numGoroutines)
+
+ wg.Wait()
+
+ // Check if the nl.done channel is closed
+ select {
+ case _, ok := <-nl.done:
+ if ok {
+ t.Errorf("nl.done channel was not closed after Close()")
+ }
+ default:
+ t.Log("nl.done channel was closed after Close() as expected")
+ }
+ t.Logf("NacosListener Close call completed successfully")
+}
+
+// TestNacosRegistryCloseListener tests the CloseListener method of
NacosRegistry
+func TestNacosRegistryCloseListener(t *testing.T) {
+ ctrl := gomock.NewController(t)
+ defer ctrl.Finish()
+
+ mockNamingClient := NewMockINamingClient(ctrl)
+ nc := &nacosClient.NacosNamingClient{}
+ nc.SetClient(mockNamingClient)
+
+ regURL, _ :=
common.NewURL("registry://127.0.0.1:8848?registry.group=testgroup")
+ serviceName := "com.example.TestService"
+
+ //Simulate the behavior of registering and closing listeners
+ nl := NewNacosListenerWithServiceName(serviceName, regURL, nc)
+ subscribeParam := &vo.SubscribeParam{
+ ServiceName: serviceName,
+ GroupName: "testgroup",
+ SubscribeCallback: nl.Callback,
+ }
+ nl.subscribeParam = subscribeParam
+ listenerCache.Store(serviceName+"testgroup", nl)
+
+ mockNamingClient.EXPECT().Unsubscribe(subscribeParam).Return(nil)
+
+ // Call the CloseListener method
+ nr := &nacosRegistry{URL: regURL, namingClient: nc}
+ nr.CloseListener()
+
+ // Verify whether to clear the entries in the listenerCache
+ if _, ok := listenerCache.Load(serviceName + "testgroup"); ok {
+ t.Errorf("listenerCache was not cleared")
+ }
+
+ select {
+ case <-nl.done:
+ t.Log("nl.done channel closed successfully")
+ default:
+ t.Errorf("nl.done channel was not closed")
+ }
+}