This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/develop by this push:
new 3a341b04a fix: fix nacos deregister when receiving shutdown signal
(#3108)
3a341b04a is described below
commit 3a341b04a4f41ec03f1033420bb99fde1e9fc92f
Author: Xuetao Li <[email protected]>
AuthorDate: Thu Dec 18 17:33:16 2025 +0800
fix: fix nacos deregister when receiving shutdown signal (#3108)
* fix deregistry
* set success to nil
* update errors.Join
* update errors.join
* import format
* add test case and add lock
* add test case
* import format
* modify lock
---
registry/nacos/service_discovery.go | 13 +-
registry/protocol/protocol.go | 8 +-
.../servicediscovery/service_discovery_registry.go | 30 ++-
.../service_discovery_registry_test.go | 243 ++++++++++++++++++++-
4 files changed, 274 insertions(+), 20 deletions(-)
diff --git a/registry/nacos/service_discovery.go
b/registry/nacos/service_discovery.go
index 6bf062292..c9f3af608 100644
--- a/registry/nacos/service_discovery.go
+++ b/registry/nacos/service_discovery.go
@@ -67,7 +67,7 @@ type nacosServiceDiscovery struct {
// cache registry instances
registryInstances []registry.ServiceInstance
- serviceNameInstancesMap map[string][]registry.ServiceInstance //Batch
registration for the same service
+ serviceNameInstancesMap map[string][]registry.ServiceInstance // Batch
registration for the same service
// registryURL stores the URL used for registration, used to fetch
dynamic config like weight
registryURL *common.URL
@@ -119,7 +119,7 @@ func (n *nacosServiceDiscovery) Register(instance
registry.ServiceInstance) erro
if err != nil || !ok {
return perrors.Errorf("register nacos instances failed,
err:%+v", err)
}
- n.registryInstances = append(n.registryInstances, instance)
//all_instances
+ n.registryInstances = append(n.registryInstances, instance) //
all_instances
return nil
}
@@ -154,7 +154,7 @@ func (n *nacosServiceDiscovery) GetDefaultPageSize() int {
func (n *nacosServiceDiscovery) GetServices() *gxset.HashSet {
res := gxset.NewSet()
- //Filter out interface-level service DataIds
+ // Filter out interface-level service DataIds
const pattern = `^providers:[\w\.]+(?::[\w\.]*:|::[\w\.]*)?$`
re := regexp.MustCompile(pattern)
for pageNo := uint32(1); ; pageNo++ {
@@ -163,7 +163,6 @@ func (n *nacosServiceDiscovery) GetServices()
*gxset.HashSet {
PageNo: pageNo,
GroupName: n.group,
})
-
if err != nil {
logger.Errorf("Could not query the services: %v", err)
return res
@@ -341,7 +340,7 @@ func (n *nacosServiceDiscovery) toRegisterInstance(instance
registry.ServiceInst
metadata = make(map[string]string, 1)
}
- //Retrieve weight (Provider takes precedence; URL may override)
+ // Retrieve weight (Provider takes precedence; URL may override)
w := instance.GetWeight()
// Set by Provider via WithServerWeight / WithWeight
@@ -354,7 +353,7 @@ func (n *nacosServiceDiscovery) toRegisterInstance(instance
registry.ServiceInst
}
}
- //Validity check
+ // Validity check
switch {
case w <= 0:
w = int64(constant.DefaultNacosWeight)
@@ -378,6 +377,7 @@ func (n *nacosServiceDiscovery) toRegisterInstance(instance
registry.ServiceInst
Ephemeral: true,
}
}
+
func (n *nacosServiceDiscovery) toBatchRegisterInstances(instances
[]registry.ServiceInstance) vo.BatchRegisterInstanceParam {
var brins vo.BatchRegisterInstanceParam
var rins []vo.RegisterInstanceParam
@@ -401,6 +401,7 @@ func (n *nacosServiceDiscovery)
toDeregisterInstance(instance registry.ServiceIn
Ip: instance.GetHost(),
Port: uint64(instance.GetPort()),
GroupName: n.group,
+ Ephemeral: true,
}
}
diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go
index c9b52d550..3944f3fb4 100644
--- a/registry/protocol/protocol.go
+++ b/registry/protocol/protocol.go
@@ -161,7 +161,6 @@ func (proto *registryProtocol) Refer(url *common.URL)
base.Invoker {
// This will start a new routine and listen to instance changes.
err = dic.Subscribe(registryUrl.SubURL)
-
if err != nil {
logger.Errorf("consumer service %v register registry %v error,
error message is %s",
serviceUrl.String(), registryUrl.String(), err.Error())
@@ -226,7 +225,7 @@ func (proto *registryProtocol) Export(originInvoker
base.Invoker) base.Exporter
exporter := proto.doLocalExport(originInvoker, providerUrl)
// update health status
- //health.SetServingStatusServing(registryUrl.Service())
+ // health.SetServingStatusServing(registryUrl.Service())
if len(registryUrl.Protocol) > 0 {
// url to registry
@@ -291,7 +290,7 @@ func registerServiceMap(invoker base.Invoker) error {
// such as
dubbo://:20000/org.apache.dubbo.UserProvider?bean.name=UserProvider&cluster=failfast...
id := providerUrl.GetParam(constant.BeanNameKey, "")
- //TODO: Temporary compatibility with old APIs, can be removed later
+ // TODO: Temporary compatibility with old APIs, can be removed later
providerConfig := config.GetProviderConfig()
@@ -462,7 +461,7 @@ func (proto *registryProtocol) Destroy() {
exporter := value.(*exporterChangeableWrapper)
reg := proto.getRegistry(getRegistryUrl(exporter.originInvoker))
if err := reg.UnRegister(exporter.registerUrl); err != nil {
- panic(err)
+ logger.Warnf("Unregister consumer url failed, %s,
error: %w", exporter.registerUrl.String(), err)
}
// TODO unsubscribeUrl
@@ -486,7 +485,6 @@ func (proto *registryProtocol) Destroy() {
return
}
}
-
}()
return true
})
diff --git a/registry/servicediscovery/service_discovery_registry.go
b/registry/servicediscovery/service_discovery_registry.go
index 2fe06d293..4760a77ff 100644
--- a/registry/servicediscovery/service_discovery_registry.go
+++ b/registry/servicediscovery/service_discovery_registry.go
@@ -18,6 +18,7 @@
package servicediscovery
import (
+ "errors"
"strconv"
"strings"
"sync"
@@ -59,7 +60,7 @@ type serviceDiscoveryRegistry struct {
lock sync.RWMutex
url *common.URL
serviceDiscovery registry.ServiceDiscovery
- instance registry.ServiceInstance
+ instances []registry.ServiceInstance
serviceNameMapping mapping.ServiceNameMapping
metadataReport report.MetadataReport
serviceListeners
map[string]registry.ServiceInstancesChangedListener
@@ -104,6 +105,9 @@ func (s *serviceDiscoveryRegistry) RegisterService() error {
if err != nil {
return perrors.WithMessage(err, "Register service
failed")
}
+ s.lock.Lock()
+ s.instances = append(s.instances, instance)
+ s.lock.Unlock()
}
return nil
}
@@ -133,14 +137,32 @@ func createInstance(meta *info.MetadataInfo, url
*common.URL) registry.ServiceIn
}
func (s *serviceDiscoveryRegistry) UnRegisterService() error {
- return s.serviceDiscovery.Unregister(s.instance)
+ s.lock.Lock()
+ keep := s.instances[:0]
+ origin := s.instances[:]
+ s.lock.Unlock()
+
+ var errs []error
+
+ for _, v := range origin {
+ if err := s.serviceDiscovery.Unregister(v); err != nil {
+ // fail to unregister
+ keep = append(keep, v)
+ errs = append(errs, err)
+ }
+ }
+
+ s.lock.Lock()
+ s.instances = keep
+ s.lock.Unlock()
+ return errors.Join(errs...)
}
func (s *serviceDiscoveryRegistry) UnRegister(url *common.URL) error {
if !shouldRegister(url) {
return nil
}
- return nil
+ return s.UnRegisterService()
}
func (s *serviceDiscoveryRegistry) UnSubscribe(url *common.URL, listener
registry.NotifyListener) error {
@@ -170,7 +192,7 @@ func parseServices(literalServices string) *gxset.HashSet {
if len(literalServices) == 0 {
return set
}
- var splitServices = strings.Split(literalServices, ",")
+ splitServices := strings.Split(literalServices, ",")
for _, s := range splitServices {
if len(s) != 0 {
set.Add(s)
diff --git a/registry/servicediscovery/service_discovery_registry_test.go
b/registry/servicediscovery/service_discovery_registry_test.go
index c466c0429..592ad1c3e 100644
--- a/registry/servicediscovery/service_discovery_registry_test.go
+++ b/registry/servicediscovery/service_discovery_registry_test.go
@@ -19,6 +19,7 @@ package servicediscovery
import (
"context"
+ "errors"
"fmt"
"sync"
"testing"
@@ -147,7 +148,7 @@ func TestServiceDiscoveryRegistryUnSubscribe(t *testing.T) {
}
// setupEnvironment initializes the test environment.
-func setupEnvironment(t *testing.T) (*mockServiceDiscovery,
*mockServiceNameMapping) {
+func setupEnvironment(_ *testing.T) (*mockServiceDiscovery,
*mockServiceNameMapping) {
appConfig := global.DefaultApplicationConfig()
appConfig.Name = testApp
@@ -175,6 +176,11 @@ type mockServiceDiscovery struct {
listenerAdded bool
capturedAppName string
capturedInstance registry.ServiceInstance
+
+ // for Unregister tests
+ unregisterCalled bool
+ unregisterIDs []string
+ unregisterErrByID map[string]error
}
func (m *mockServiceDiscovery) String() string { return "mock" }
@@ -184,10 +190,21 @@ func (m *mockServiceDiscovery) Register(inst
registry.ServiceInstance) error {
m.capturedInstance = inst
return nil
}
-func (m *mockServiceDiscovery) Update(inst registry.ServiceInstance) error
{ return nil }
-func (m *mockServiceDiscovery) Unregister(inst registry.ServiceInstance) error
{ return nil }
-func (m *mockServiceDiscovery) GetDefaultPageSize() int
{ return 10 }
-func (m *mockServiceDiscovery) GetServices() *gxset.HashSet
{ return gxset.NewSet("mock-service") }
+func (m *mockServiceDiscovery) Update(inst registry.ServiceInstance) error {
return nil }
+func (m *mockServiceDiscovery) Unregister(inst registry.ServiceInstance) error
{
+ m.unregisterCalled = true
+ if inst != nil {
+ m.unregisterIDs = append(m.unregisterIDs, inst.GetID())
+ if m.unregisterErrByID != nil {
+ if err, ok := m.unregisterErrByID[inst.GetID()]; ok {
+ return err
+ }
+ }
+ }
+ return nil
+}
+func (m *mockServiceDiscovery) GetDefaultPageSize() int { return 10 }
+func (m *mockServiceDiscovery) GetServices() *gxset.HashSet { return
gxset.NewSet("mock-service") }
func (m *mockServiceDiscovery) GetInstances(name string)
[]registry.ServiceInstance {
m.capturedAppName = name
return []registry.ServiceInstance{}
@@ -196,9 +213,11 @@ func (m *mockServiceDiscovery) GetInstancesByPage(string,
int, int) gxpage.Pager
func (m *mockServiceDiscovery) GetHealthyInstancesByPage(string, int, int,
bool) gxpage.Pager {
return nil
}
+
func (m *mockServiceDiscovery) GetRequestInstances([]string, int, int)
map[string]gxpage.Pager {
return nil
}
+
func (m *mockServiceDiscovery)
AddListener(registry.ServiceInstancesChangedListener) error {
defer m.wg.Done()
m.listenerAdded = true
@@ -220,6 +239,7 @@ func (m *mockServiceNameMapping) Map(url *common.URL) error
{
m.data[serviceInterface] = gxset.NewSet(appName)
return nil
}
+
func (m *mockServiceNameMapping) Get(url *common.URL, _
mapping.MappingListener) (*gxset.HashSet, error) {
m.getCalled = true
m.capturedGroup = url.GetParam(constant.GroupKey, "")
@@ -236,6 +256,7 @@ type mockNotifyListener struct{}
func (m *mockNotifyListener) Notify(*registry.ServiceEvent) {
// for mocking
}
+
func (m *mockNotifyListener) NotifyAll([]*registry.ServiceEvent, func()) {
// for mocking
}
@@ -279,3 +300,215 @@ func (m *mockInvoker) Destroy() {
// for mocking
}
func (m *mockInvoker) Invoke(context.Context, protocol.Invocation)
protocol.Result { return nil }
+
+// TestServiceDiscoveryRegistryUnRegister_AllSuccess verifies
UnRegisterService bulk deregister success.
+func TestServiceDiscoveryRegistryUnRegister_AllSuccess(t *testing.T) {
+ mockSD, _ := setupEnvironment(t)
+
+ registryURL, _ := common.NewURL(testRegistryURL,
+ common.WithParamsValue(constant.RegistryKey, "mock"))
+
+ reg, err := newServiceDiscoveryRegistry(registryURL)
+ assert.NoError(t, err)
+
+ sdReg, ok := reg.(*serviceDiscoveryRegistry)
+ assert.True(t, ok)
+
+ inst1 := ®istry.DefaultServiceInstance{
+ ID: "inst-1",
+ ServiceName: testApp,
+ Host: "127.0.0.1",
+ Port: 20880,
+ Enable: true,
+ Healthy: true,
+ Metadata: map[string]string{"k": "v"},
+ }
+ inst2 := ®istry.DefaultServiceInstance{
+ ID: "inst-2",
+ ServiceName: testApp,
+ Host: "127.0.0.1",
+ Port: 20881,
+ Enable: true,
+ Healthy: true,
+ Metadata: map[string]string{"k2": "v2"},
+ }
+
+ sdReg.instances = []registry.ServiceInstance{inst1, inst2}
+ mockSD.unregisterErrByID = nil
+
+ err = sdReg.UnRegisterService()
+ assert.NoError(t, err)
+
+ // all instances should be removed after successful deregister
+ assert.Empty(t, sdReg.instances)
+
+ // verify Unregister called for each instance
+ assert.True(t, mockSD.unregisterCalled)
+ assert.ElementsMatch(t, []string{"inst-1", "inst-2"},
mockSD.unregisterIDs)
+}
+
+// TestServiceDiscoveryRegistryUnRegister_PartialFail verifies failed
instances are kept and errors are returned.
+func TestServiceDiscoveryRegistryUnRegister_PartialFail(t *testing.T) {
+ mockSD, _ := setupEnvironment(t)
+
+ registryURL, _ := common.NewURL(testRegistryURL,
+ common.WithParamsValue(constant.RegistryKey, "mock"))
+
+ reg, err := newServiceDiscoveryRegistry(registryURL)
+ assert.NoError(t, err)
+
+ sdReg, ok := reg.(*serviceDiscoveryRegistry)
+ assert.True(t, ok)
+
+ inst1 := ®istry.DefaultServiceInstance{
+ ID: "inst-1",
+ ServiceName: testApp,
+ Host: "127.0.0.1",
+ Port: 20880,
+ Enable: true,
+ Healthy: true,
+ }
+ inst2 := ®istry.DefaultServiceInstance{
+ ID: "inst-2",
+ ServiceName: testApp,
+ Host: "127.0.0.1",
+ Port: 20881,
+ Enable: true,
+ Healthy: true,
+ }
+
+ sdReg.instances = []registry.ServiceInstance{inst1, inst2}
+
+ mockSD.unregisterErrByID = map[string]error{
+ "inst-1": errors.New("mock unregister failed"),
+ }
+
+ err = sdReg.UnRegisterService()
+ assert.Error(t, err)
+
+ // failed instance should remain, successful one removed
+ assert.Equal(t, 1, len(sdReg.instances))
+ assert.Equal(t, "inst-1", sdReg.instances[0].GetID())
+
+ // still called for both
+ assert.True(t, mockSD.unregisterCalled)
+ assert.ElementsMatch(t, []string{"inst-1", "inst-2"},
mockSD.unregisterIDs)
+}
+
+// TestServiceDiscoveryRegistryUnRegister_NoInstances verifies
UnRegisterService handles empty list.
+func TestServiceDiscoveryRegistryUnRegister_NoInstances(t *testing.T) {
+ mockSD, _ := setupEnvironment(t)
+
+ registryURL, _ := common.NewURL(testRegistryURL,
+ common.WithParamsValue(constant.RegistryKey, "mock"))
+
+ reg, err := newServiceDiscoveryRegistry(registryURL)
+ assert.NoError(t, err)
+
+ sdReg, ok := reg.(*serviceDiscoveryRegistry)
+ assert.True(t, ok)
+
+ sdReg.instances = []registry.ServiceInstance{}
+
+ err = sdReg.UnRegisterService()
+ assert.NoError(t, err)
+
+ // should stay empty
+ assert.Empty(t, sdReg.instances)
+
+ // should not call Unregister
+ assert.False(t, mockSD.unregisterCalled)
+ assert.Empty(t, mockSD.unregisterIDs)
+}
+
+// TestServiceDiscoveryRegistryUnRegister_Concurrent verifies
UnRegisterService under concurrent access.
+func TestServiceDiscoveryRegistryUnRegister_Concurrent(t *testing.T) {
+ mockSD, _ := setupEnvironment(t)
+
+ registryURL, _ := common.NewURL(testRegistryURL,
+ common.WithParamsValue(constant.RegistryKey, "mock"))
+
+ reg, err := newServiceDiscoveryRegistry(registryURL)
+ assert.NoError(t, err)
+
+ sdReg, ok := reg.(*serviceDiscoveryRegistry)
+ assert.True(t, ok)
+
+ // prepare initial instances
+ for i := 0; i < 5; i++ {
+ inst := ®istry.DefaultServiceInstance{
+ ID: fmt.Sprintf("init-%d", i),
+ ServiceName: testApp,
+ Host: "127.0.0.1",
+ Port: 20000 + i,
+ Enable: true,
+ Healthy: true,
+ }
+ sdReg.instances = append(sdReg.instances, inst)
+ }
+
+ mockSD.unregisterErrByID = map[string]error{
+ "init-2": errors.New("mock unregister failed"),
+ }
+
+ var wg sync.WaitGroup
+ panicCh := make(chan any, 2)
+
+ // goroutine 1: unregister
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ defer func() {
+ if r := recover(); r != nil {
+ panicCh <- r
+ }
+ }()
+ _ = sdReg.UnRegisterService()
+ }()
+
+ // goroutine 2: simulate concurrent register / instance append
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ defer func() {
+ if r := recover(); r != nil {
+ panicCh <- r
+ }
+ }()
+
+ for i := 0; i < 3; i++ {
+ inst := ®istry.DefaultServiceInstance{
+ ID: fmt.Sprintf("concurrent-%d", i),
+ ServiceName: testApp,
+ Host: "127.0.0.1",
+ Port: 21000 + i,
+ Enable: true,
+ Healthy: true,
+ }
+ // intentionally no lock: simulating real race
+ sdReg.instances = append(sdReg.instances, inst)
+ time.Sleep(10 * time.Millisecond)
+ }
+ }()
+
+ wg.Wait()
+ close(panicCh)
+
+ // assert no panic happened
+ for p := range panicCh {
+ t.Fatalf("panic occurred during concurrent unregister: %v", p)
+ }
+
+ // at least the failed instance should remain
+ foundFailed := false
+ for _, inst := range sdReg.instances {
+ if inst.GetID() == "init-2" {
+ foundFailed = true
+ break
+ }
+ }
+ assert.True(t, foundFailed, "failed instance should be kept after
UnRegisterService")
+
+ // unregister should be called
+ assert.True(t, mockSD.unregisterCalled)
+}