This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3a341b04a fix: fix nacos deregister when receiving shutdown signal 
(#3108)
3a341b04a is described below

commit 3a341b04a4f41ec03f1033420bb99fde1e9fc92f
Author: Xuetao Li <[email protected]>
AuthorDate: Thu Dec 18 17:33:16 2025 +0800

    fix: fix nacos deregister when receiving shutdown signal (#3108)
    
    * fix deregistry
    
    * set success to nil
    
    * update errors.Join
    
    * update errors.join
    
    * import format
    
    * add test case and add lock
    
    * add test case
    
    * import format
    
    * modify lock
---
 registry/nacos/service_discovery.go                |  13 +-
 registry/protocol/protocol.go                      |   8 +-
 .../servicediscovery/service_discovery_registry.go |  30 ++-
 .../service_discovery_registry_test.go             | 243 ++++++++++++++++++++-
 4 files changed, 274 insertions(+), 20 deletions(-)

diff --git a/registry/nacos/service_discovery.go 
b/registry/nacos/service_discovery.go
index 6bf062292..c9f3af608 100644
--- a/registry/nacos/service_discovery.go
+++ b/registry/nacos/service_discovery.go
@@ -67,7 +67,7 @@ type nacosServiceDiscovery struct {
        // cache registry instances
        registryInstances []registry.ServiceInstance
 
-       serviceNameInstancesMap map[string][]registry.ServiceInstance //Batch 
registration for the same service
+       serviceNameInstancesMap map[string][]registry.ServiceInstance // Batch 
registration for the same service
 
        // registryURL stores the URL used for registration, used to fetch 
dynamic config like weight
        registryURL *common.URL
@@ -119,7 +119,7 @@ func (n *nacosServiceDiscovery) Register(instance 
registry.ServiceInstance) erro
        if err != nil || !ok {
                return perrors.Errorf("register nacos instances failed, 
err:%+v", err)
        }
-       n.registryInstances = append(n.registryInstances, instance) 
//all_instances
+       n.registryInstances = append(n.registryInstances, instance) // 
all_instances
        return nil
 }
 
@@ -154,7 +154,7 @@ func (n *nacosServiceDiscovery) GetDefaultPageSize() int {
 func (n *nacosServiceDiscovery) GetServices() *gxset.HashSet {
        res := gxset.NewSet()
 
-       //Filter out interface-level service DataIds
+       // Filter out interface-level service DataIds
        const pattern = `^providers:[\w\.]+(?::[\w\.]*:|::[\w\.]*)?$`
        re := regexp.MustCompile(pattern)
        for pageNo := uint32(1); ; pageNo++ {
@@ -163,7 +163,6 @@ func (n *nacosServiceDiscovery) GetServices() 
*gxset.HashSet {
                        PageNo:    pageNo,
                        GroupName: n.group,
                })
-
                if err != nil {
                        logger.Errorf("Could not query the services: %v", err)
                        return res
@@ -341,7 +340,7 @@ func (n *nacosServiceDiscovery) toRegisterInstance(instance 
registry.ServiceInst
                metadata = make(map[string]string, 1)
        }
 
-       //Retrieve weight (Provider takes precedence; URL may override)
+       // Retrieve weight (Provider takes precedence; URL may override)
        w := instance.GetWeight()
        // Set by Provider via WithServerWeight / WithWeight
 
@@ -354,7 +353,7 @@ func (n *nacosServiceDiscovery) toRegisterInstance(instance 
registry.ServiceInst
                }
        }
 
-       //Validity check
+       // Validity check
        switch {
        case w <= 0:
                w = int64(constant.DefaultNacosWeight)
@@ -378,6 +377,7 @@ func (n *nacosServiceDiscovery) toRegisterInstance(instance 
registry.ServiceInst
                Ephemeral: true,
        }
 }
+
 func (n *nacosServiceDiscovery) toBatchRegisterInstances(instances 
[]registry.ServiceInstance) vo.BatchRegisterInstanceParam {
        var brins vo.BatchRegisterInstanceParam
        var rins []vo.RegisterInstanceParam
@@ -401,6 +401,7 @@ func (n *nacosServiceDiscovery) 
toDeregisterInstance(instance registry.ServiceIn
                Ip:          instance.GetHost(),
                Port:        uint64(instance.GetPort()),
                GroupName:   n.group,
+               Ephemeral:   true,
        }
 }
 
diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go
index c9b52d550..3944f3fb4 100644
--- a/registry/protocol/protocol.go
+++ b/registry/protocol/protocol.go
@@ -161,7 +161,6 @@ func (proto *registryProtocol) Refer(url *common.URL) 
base.Invoker {
 
        // This will start a new routine and listen to instance changes.
        err = dic.Subscribe(registryUrl.SubURL)
-
        if err != nil {
                logger.Errorf("consumer service %v register registry %v error, 
error message is %s",
                        serviceUrl.String(), registryUrl.String(), err.Error())
@@ -226,7 +225,7 @@ func (proto *registryProtocol) Export(originInvoker 
base.Invoker) base.Exporter
        exporter := proto.doLocalExport(originInvoker, providerUrl)
 
        // update health status
-       //health.SetServingStatusServing(registryUrl.Service())
+       // health.SetServingStatusServing(registryUrl.Service())
 
        if len(registryUrl.Protocol) > 0 {
                // url to registry
@@ -291,7 +290,7 @@ func registerServiceMap(invoker base.Invoker) error {
        // such as 
dubbo://:20000/org.apache.dubbo.UserProvider?bean.name=UserProvider&cluster=failfast...
        id := providerUrl.GetParam(constant.BeanNameKey, "")
 
-       //TODO: Temporary compatibility with old APIs, can be removed later
+       // TODO: Temporary compatibility with old APIs, can be removed later
 
        providerConfig := config.GetProviderConfig()
 
@@ -462,7 +461,7 @@ func (proto *registryProtocol) Destroy() {
                exporter := value.(*exporterChangeableWrapper)
                reg := proto.getRegistry(getRegistryUrl(exporter.originInvoker))
                if err := reg.UnRegister(exporter.registerUrl); err != nil {
-                       panic(err)
+                       logger.Warnf("Unregister consumer url failed, %s, 
error: %w", exporter.registerUrl.String(), err)
                }
                // TODO unsubscribeUrl
 
@@ -486,7 +485,6 @@ func (proto *registryProtocol) Destroy() {
                                        return
                                }
                        }
-
                }()
                return true
        })
diff --git a/registry/servicediscovery/service_discovery_registry.go 
b/registry/servicediscovery/service_discovery_registry.go
index 2fe06d293..4760a77ff 100644
--- a/registry/servicediscovery/service_discovery_registry.go
+++ b/registry/servicediscovery/service_discovery_registry.go
@@ -18,6 +18,7 @@
 package servicediscovery
 
 import (
+       "errors"
        "strconv"
        "strings"
        "sync"
@@ -59,7 +60,7 @@ type serviceDiscoveryRegistry struct {
        lock                    sync.RWMutex
        url                     *common.URL
        serviceDiscovery        registry.ServiceDiscovery
-       instance                registry.ServiceInstance
+       instances               []registry.ServiceInstance
        serviceNameMapping      mapping.ServiceNameMapping
        metadataReport          report.MetadataReport
        serviceListeners        
map[string]registry.ServiceInstancesChangedListener
@@ -104,6 +105,9 @@ func (s *serviceDiscoveryRegistry) RegisterService() error {
                if err != nil {
                        return perrors.WithMessage(err, "Register service 
failed")
                }
+               s.lock.Lock()
+               s.instances = append(s.instances, instance)
+               s.lock.Unlock()
        }
        return nil
 }
@@ -133,14 +137,32 @@ func createInstance(meta *info.MetadataInfo, url 
*common.URL) registry.ServiceIn
 }
 
 func (s *serviceDiscoveryRegistry) UnRegisterService() error {
-       return s.serviceDiscovery.Unregister(s.instance)
+       s.lock.Lock()
+       keep := s.instances[:0]
+       origin := s.instances[:]
+       s.lock.Unlock()
+
+       var errs []error
+
+       for _, v := range origin {
+               if err := s.serviceDiscovery.Unregister(v); err != nil {
+                       // fail to unregister
+                       keep = append(keep, v)
+                       errs = append(errs, err)
+               }
+       }
+
+       s.lock.Lock()
+       s.instances = keep
+       s.lock.Unlock()
+       return errors.Join(errs...)
 }
 
 func (s *serviceDiscoveryRegistry) UnRegister(url *common.URL) error {
        if !shouldRegister(url) {
                return nil
        }
-       return nil
+       return s.UnRegisterService()
 }
 
 func (s *serviceDiscoveryRegistry) UnSubscribe(url *common.URL, listener 
registry.NotifyListener) error {
@@ -170,7 +192,7 @@ func parseServices(literalServices string) *gxset.HashSet {
        if len(literalServices) == 0 {
                return set
        }
-       var splitServices = strings.Split(literalServices, ",")
+       splitServices := strings.Split(literalServices, ",")
        for _, s := range splitServices {
                if len(s) != 0 {
                        set.Add(s)
diff --git a/registry/servicediscovery/service_discovery_registry_test.go 
b/registry/servicediscovery/service_discovery_registry_test.go
index c466c0429..592ad1c3e 100644
--- a/registry/servicediscovery/service_discovery_registry_test.go
+++ b/registry/servicediscovery/service_discovery_registry_test.go
@@ -19,6 +19,7 @@ package servicediscovery
 
 import (
        "context"
+       "errors"
        "fmt"
        "sync"
        "testing"
@@ -147,7 +148,7 @@ func TestServiceDiscoveryRegistryUnSubscribe(t *testing.T) {
 }
 
 // setupEnvironment initializes the test environment.
-func setupEnvironment(t *testing.T) (*mockServiceDiscovery, 
*mockServiceNameMapping) {
+func setupEnvironment(_ *testing.T) (*mockServiceDiscovery, 
*mockServiceNameMapping) {
        appConfig := global.DefaultApplicationConfig()
        appConfig.Name = testApp
 
@@ -175,6 +176,11 @@ type mockServiceDiscovery struct {
        listenerAdded    bool
        capturedAppName  string
        capturedInstance registry.ServiceInstance
+
+       // for Unregister tests
+       unregisterCalled  bool
+       unregisterIDs     []string
+       unregisterErrByID map[string]error
 }
 
 func (m *mockServiceDiscovery) String() string { return "mock" }
@@ -184,10 +190,21 @@ func (m *mockServiceDiscovery) Register(inst 
registry.ServiceInstance) error {
        m.capturedInstance = inst
        return nil
 }
-func (m *mockServiceDiscovery) Update(inst registry.ServiceInstance) error     
{ return nil }
-func (m *mockServiceDiscovery) Unregister(inst registry.ServiceInstance) error 
{ return nil }
-func (m *mockServiceDiscovery) GetDefaultPageSize() int                        
{ return 10 }
-func (m *mockServiceDiscovery) GetServices() *gxset.HashSet                    
{ return gxset.NewSet("mock-service") }
+func (m *mockServiceDiscovery) Update(inst registry.ServiceInstance) error { 
return nil }
+func (m *mockServiceDiscovery) Unregister(inst registry.ServiceInstance) error 
{
+       m.unregisterCalled = true
+       if inst != nil {
+               m.unregisterIDs = append(m.unregisterIDs, inst.GetID())
+               if m.unregisterErrByID != nil {
+                       if err, ok := m.unregisterErrByID[inst.GetID()]; ok {
+                               return err
+                       }
+               }
+       }
+       return nil
+}
+func (m *mockServiceDiscovery) GetDefaultPageSize() int     { return 10 }
+func (m *mockServiceDiscovery) GetServices() *gxset.HashSet { return 
gxset.NewSet("mock-service") }
 func (m *mockServiceDiscovery) GetInstances(name string) 
[]registry.ServiceInstance {
        m.capturedAppName = name
        return []registry.ServiceInstance{}
@@ -196,9 +213,11 @@ func (m *mockServiceDiscovery) GetInstancesByPage(string, 
int, int) gxpage.Pager
 func (m *mockServiceDiscovery) GetHealthyInstancesByPage(string, int, int, 
bool) gxpage.Pager {
        return nil
 }
+
 func (m *mockServiceDiscovery) GetRequestInstances([]string, int, int) 
map[string]gxpage.Pager {
        return nil
 }
+
 func (m *mockServiceDiscovery) 
AddListener(registry.ServiceInstancesChangedListener) error {
        defer m.wg.Done()
        m.listenerAdded = true
@@ -220,6 +239,7 @@ func (m *mockServiceNameMapping) Map(url *common.URL) error 
{
        m.data[serviceInterface] = gxset.NewSet(appName)
        return nil
 }
+
 func (m *mockServiceNameMapping) Get(url *common.URL, _ 
mapping.MappingListener) (*gxset.HashSet, error) {
        m.getCalled = true
        m.capturedGroup = url.GetParam(constant.GroupKey, "")
@@ -236,6 +256,7 @@ type mockNotifyListener struct{}
 func (m *mockNotifyListener) Notify(*registry.ServiceEvent) {
        // for mocking
 }
+
 func (m *mockNotifyListener) NotifyAll([]*registry.ServiceEvent, func()) {
        // for mocking
 }
@@ -279,3 +300,215 @@ func (m *mockInvoker) Destroy() {
        // for mocking
 }
 func (m *mockInvoker) Invoke(context.Context, protocol.Invocation) 
protocol.Result { return nil }
+
+// TestServiceDiscoveryRegistryUnRegister_AllSuccess verifies 
UnRegisterService bulk deregister success.
+func TestServiceDiscoveryRegistryUnRegister_AllSuccess(t *testing.T) {
+       mockSD, _ := setupEnvironment(t)
+
+       registryURL, _ := common.NewURL(testRegistryURL,
+               common.WithParamsValue(constant.RegistryKey, "mock"))
+
+       reg, err := newServiceDiscoveryRegistry(registryURL)
+       assert.NoError(t, err)
+
+       sdReg, ok := reg.(*serviceDiscoveryRegistry)
+       assert.True(t, ok)
+
+       inst1 := &registry.DefaultServiceInstance{
+               ID:          "inst-1",
+               ServiceName: testApp,
+               Host:        "127.0.0.1",
+               Port:        20880,
+               Enable:      true,
+               Healthy:     true,
+               Metadata:    map[string]string{"k": "v"},
+       }
+       inst2 := &registry.DefaultServiceInstance{
+               ID:          "inst-2",
+               ServiceName: testApp,
+               Host:        "127.0.0.1",
+               Port:        20881,
+               Enable:      true,
+               Healthy:     true,
+               Metadata:    map[string]string{"k2": "v2"},
+       }
+
+       sdReg.instances = []registry.ServiceInstance{inst1, inst2}
+       mockSD.unregisterErrByID = nil
+
+       err = sdReg.UnRegisterService()
+       assert.NoError(t, err)
+
+       // all instances should be removed after successful deregister
+       assert.Empty(t, sdReg.instances)
+
+       // verify Unregister called for each instance
+       assert.True(t, mockSD.unregisterCalled)
+       assert.ElementsMatch(t, []string{"inst-1", "inst-2"}, 
mockSD.unregisterIDs)
+}
+
+// TestServiceDiscoveryRegistryUnRegister_PartialFail verifies failed 
instances are kept and errors are returned.
+func TestServiceDiscoveryRegistryUnRegister_PartialFail(t *testing.T) {
+       mockSD, _ := setupEnvironment(t)
+
+       registryURL, _ := common.NewURL(testRegistryURL,
+               common.WithParamsValue(constant.RegistryKey, "mock"))
+
+       reg, err := newServiceDiscoveryRegistry(registryURL)
+       assert.NoError(t, err)
+
+       sdReg, ok := reg.(*serviceDiscoveryRegistry)
+       assert.True(t, ok)
+
+       inst1 := &registry.DefaultServiceInstance{
+               ID:          "inst-1",
+               ServiceName: testApp,
+               Host:        "127.0.0.1",
+               Port:        20880,
+               Enable:      true,
+               Healthy:     true,
+       }
+       inst2 := &registry.DefaultServiceInstance{
+               ID:          "inst-2",
+               ServiceName: testApp,
+               Host:        "127.0.0.1",
+               Port:        20881,
+               Enable:      true,
+               Healthy:     true,
+       }
+
+       sdReg.instances = []registry.ServiceInstance{inst1, inst2}
+
+       mockSD.unregisterErrByID = map[string]error{
+               "inst-1": errors.New("mock unregister failed"),
+       }
+
+       err = sdReg.UnRegisterService()
+       assert.Error(t, err)
+
+       // failed instance should remain, successful one removed
+       assert.Equal(t, 1, len(sdReg.instances))
+       assert.Equal(t, "inst-1", sdReg.instances[0].GetID())
+
+       // still called for both
+       assert.True(t, mockSD.unregisterCalled)
+       assert.ElementsMatch(t, []string{"inst-1", "inst-2"}, 
mockSD.unregisterIDs)
+}
+
+// TestServiceDiscoveryRegistryUnRegister_NoInstances verifies 
UnRegisterService handles empty list.
+func TestServiceDiscoveryRegistryUnRegister_NoInstances(t *testing.T) {
+       mockSD, _ := setupEnvironment(t)
+
+       registryURL, _ := common.NewURL(testRegistryURL,
+               common.WithParamsValue(constant.RegistryKey, "mock"))
+
+       reg, err := newServiceDiscoveryRegistry(registryURL)
+       assert.NoError(t, err)
+
+       sdReg, ok := reg.(*serviceDiscoveryRegistry)
+       assert.True(t, ok)
+
+       sdReg.instances = []registry.ServiceInstance{}
+
+       err = sdReg.UnRegisterService()
+       assert.NoError(t, err)
+
+       // should stay empty
+       assert.Empty(t, sdReg.instances)
+
+       // should not call Unregister
+       assert.False(t, mockSD.unregisterCalled)
+       assert.Empty(t, mockSD.unregisterIDs)
+}
+
+// TestServiceDiscoveryRegistryUnRegister_Concurrent verifies 
UnRegisterService under concurrent access.
+func TestServiceDiscoveryRegistryUnRegister_Concurrent(t *testing.T) {
+       mockSD, _ := setupEnvironment(t)
+
+       registryURL, _ := common.NewURL(testRegistryURL,
+               common.WithParamsValue(constant.RegistryKey, "mock"))
+
+       reg, err := newServiceDiscoveryRegistry(registryURL)
+       assert.NoError(t, err)
+
+       sdReg, ok := reg.(*serviceDiscoveryRegistry)
+       assert.True(t, ok)
+
+       // prepare initial instances
+       for i := 0; i < 5; i++ {
+               inst := &registry.DefaultServiceInstance{
+                       ID:          fmt.Sprintf("init-%d", i),
+                       ServiceName: testApp,
+                       Host:        "127.0.0.1",
+                       Port:        20000 + i,
+                       Enable:      true,
+                       Healthy:     true,
+               }
+               sdReg.instances = append(sdReg.instances, inst)
+       }
+
+       mockSD.unregisterErrByID = map[string]error{
+               "init-2": errors.New("mock unregister failed"),
+       }
+
+       var wg sync.WaitGroup
+       panicCh := make(chan any, 2)
+
+       // goroutine 1: unregister
+       wg.Add(1)
+       go func() {
+               defer wg.Done()
+               defer func() {
+                       if r := recover(); r != nil {
+                               panicCh <- r
+                       }
+               }()
+               _ = sdReg.UnRegisterService()
+       }()
+
+       // goroutine 2: simulate concurrent register / instance append
+       wg.Add(1)
+       go func() {
+               defer wg.Done()
+               defer func() {
+                       if r := recover(); r != nil {
+                               panicCh <- r
+                       }
+               }()
+
+               for i := 0; i < 3; i++ {
+                       inst := &registry.DefaultServiceInstance{
+                               ID:          fmt.Sprintf("concurrent-%d", i),
+                               ServiceName: testApp,
+                               Host:        "127.0.0.1",
+                               Port:        21000 + i,
+                               Enable:      true,
+                               Healthy:     true,
+                       }
+                       // intentionally no lock: simulating real race
+                       sdReg.instances = append(sdReg.instances, inst)
+                       time.Sleep(10 * time.Millisecond)
+               }
+       }()
+
+       wg.Wait()
+       close(panicCh)
+
+       // assert no panic happened
+       for p := range panicCh {
+               t.Fatalf("panic occurred during concurrent unregister: %v", p)
+       }
+
+       // at least the failed instance should remain
+       foundFailed := false
+       for _, inst := range sdReg.instances {
+               if inst.GetID() == "init-2" {
+                       foundFailed = true
+                       break
+               }
+       }
+       assert.True(t, foundFailed, "failed instance should be kept after 
UnRegisterService")
+
+       // unregister should be called
+       assert.True(t, mockSD.unregisterCalled)
+}

Reply via email to