This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/develop by this push:
new eee5acd89 Fix nacos instance overwrite (#2882)
eee5acd89 is described below
commit eee5acd896a2cbabc783a2c1f5ebda45dd6e092f
Author: Wiggins <[email protected]>
AuthorDate: Thu May 29 15:40:00 2025 +0800
Fix nacos instance overwrite (#2882)
* fix register overwrite #2581
* update #2581
* update Batch register instances with the same service name. #2581
* update #2581
* update #2581
* update #2581
* update #2581
* update:Fixed some redundancies and formatting #2581
* update:Unit tests are added, and the
metadata:dubbo.metadata-service.url-params port protocol of instances is fixed
in multi-instance cases #2581
* update:testfaild #2581
* update:#2581
* update:#2581
* update:#2581
* update:#2581
* update:#2581
* update: todo #2581
* update: name #2581
---------
Co-authored-by: wiggins <[email protected]>
---
registry/nacos/service_discovery.go | 43 ++++++++++---
registry/nacos/service_discovery_test.go | 71 ++++++++++++++++++++++
.../metadata_service_url_params_customizer.go | 1 +
.../servicediscovery/service_discovery_registry.go | 22 +++++--
4 files changed, 121 insertions(+), 16 deletions(-)
diff --git a/registry/nacos/service_discovery.go
b/registry/nacos/service_discovery.go
index 7628b1ac4..5d255bfd1 100644
--- a/registry/nacos/service_discovery.go
+++ b/registry/nacos/service_discovery.go
@@ -66,6 +66,8 @@ type nacosServiceDiscovery struct {
// cache registry instances
registryInstances []registry.ServiceInstance
+ serviceNameInstancesMap map[string][]registry.ServiceInstance //Batch
registration for the same service
+
// registryURL stores the URL used for registration, used to fetch
dynamic config like weight
registryURL *common.URL
@@ -89,12 +91,17 @@ func (n *nacosServiceDiscovery) Destroy() error {
// Register will register the service to nacos
func (n *nacosServiceDiscovery) Register(instance registry.ServiceInstance)
error {
- ins := n.toRegisterInstance(instance)
- ok, err := n.namingClient.Client().RegisterInstance(ins)
+ instSrvName := instance.GetServiceName()
+ if n.serviceNameInstancesMap == nil {
+ n.serviceNameInstancesMap =
make(map[string][]registry.ServiceInstance)
+ }
+ n.serviceNameInstancesMap[instSrvName] =
append(n.serviceNameInstancesMap[instSrvName], instance)
+ brins :=
n.toBatchRegisterInstances(n.serviceNameInstancesMap[instSrvName])
+ ok, err := n.namingClient.Client().BatchRegisterInstance(brins)
if err != nil || !ok {
- return perrors.WithMessage(err, "Could not register the
instance. "+instance.GetServiceName())
+ return perrors.Errorf("register nacos instances failed,
err:%+v", err)
}
- n.registryInstances = append(n.registryInstances, instance)
+ n.registryInstances = append(n.registryInstances, instance)
//all_instances
return nil
}
@@ -338,6 +345,21 @@ func (n *nacosServiceDiscovery)
toRegisterInstance(instance registry.ServiceInst
Ephemeral: true,
}
}
+func (n *nacosServiceDiscovery) toBatchRegisterInstances(instances
[]registry.ServiceInstance) vo.BatchRegisterInstanceParam {
+ var brins vo.BatchRegisterInstanceParam
+ var rins []vo.RegisterInstanceParam
+ for _, instance := range instances {
+ rins = append(rins, n.toRegisterInstance(instance))
+ }
+ if len(rins) == 0 {
+ logger.Warnf("No batch register instances found")
+ return vo.BatchRegisterInstanceParam{}
+ }
+ brins.ServiceName = rins[0].ServiceName
+ brins.GroupName = n.group
+ brins.Instances = rins
+ return brins
+}
// toDeregisterInstance will convert the ServiceInstance to
DeregisterInstanceParam
func (n *nacosServiceDiscovery) toDeregisterInstance(instance
registry.ServiceInstance) vo.DeregisterInstanceParam {
@@ -374,12 +396,13 @@ func newNacosServiceDiscovery(url *common.URL)
(registry.ServiceDiscovery, error
group := url.GetParam(constant.RegistryGroupKey, defaultGroup)
newInstance := &nacosServiceDiscovery{
- group: group,
- namingClient: client,
- descriptor: descriptor,
- registryInstances: []registry.ServiceInstance{},
- registryURL: url,
- instanceListenerMap: make(map[string]*gxset.HashSet),
+ group: group,
+ namingClient: client,
+ descriptor: descriptor,
+ registryInstances: []registry.ServiceInstance{},
+ serviceNameInstancesMap:
make(map[string][]registry.ServiceInstance),
+ registryURL: url,
+ instanceListenerMap: make(map[string]*gxset.HashSet),
}
return newInstance, nil
}
diff --git a/registry/nacos/service_discovery_test.go
b/registry/nacos/service_discovery_test.go
index 69124ef91..edc9d4893 100644
--- a/registry/nacos/service_discovery_test.go
+++ b/registry/nacos/service_discovery_test.go
@@ -19,7 +19,9 @@ package nacos
import (
"context"
+ "encoding/json"
"fmt"
+ "strconv"
"sync"
"testing"
)
@@ -117,6 +119,75 @@ func TestFunction(t *testing.T) {
assert.Nil(t, err)
}
+func TestBatchRegisterInstances(t *testing.T) {
+ extension.SetProtocol("mock", func() protocol.Protocol {
+ return &mockProtocol{}
+ })
+ var urls []*common.URL
+ url1, _ := common.NewURL("dubbo://127.0.0.1:8848")
+ url2, _ := common.NewURL("tri://127.0.0.1:8848")
+ port := 20000
+ urls = append(urls, url1)
+ urls = append(urls, url2)
+ for _, url := range urls {
+ pcl := url.Protocol
+ port = port + 1
+ sd, _ := newMockNacosServiceDiscovery(url)
+ defer func() {
+ _ = sd.Destroy()
+ }()
+ ins := ®istry.DefaultServiceInstance{
+ ID: "testID",
+ ServiceName: "nacos_batchRegister_test1",
+ Host: url.Ip,
+ Port: port,
+ Enable: true,
+ Healthy: true,
+ Metadata: nil,
+ }
+ params := map[string]string{
+ "protocol": "mock",
+ "timeout": "",
+ "version": "",
+ pcl: "",
+ "release": "",
+ "port": strconv.Itoa(port),
+ }
+ parmjosn, _ := json.Marshal(params)
+ ins.Metadata = map[string]string{"t1": "test",
constant.MetadataServiceURLParamsPropertyName: string(parmjosn)}
+ err := sd.Register(ins)
+ assert.Nil(t, err)
+ }
+
+ url3, _ := common.NewURL("tri://127.0.0.1:8848")
+ sd, _ := newMockNacosServiceDiscovery(url3)
+ defer func() {
+ _ = sd.Destroy()
+ }()
+ ins := ®istry.DefaultServiceInstance{
+ ID: "testID",
+ ServiceName: "nacos_batchRegister_test2",
+ Host: "127.0.0.1",
+ Port: 20004,
+ Enable: true,
+ Healthy: true,
+ Metadata: nil,
+ }
+ params := map[string]string{
+ "protocol": "mock",
+ "timeout": "",
+ "version": "",
+ url3.Protocol: "",
+ "release": "",
+ "port": "20004",
+ }
+ parmjosn, _ := json.Marshal(params)
+ ins.Metadata = map[string]string{"t1": "test",
constant.MetadataServiceURLParamsPropertyName: string(parmjosn)}
+ err := sd.Register(ins)
+ assert.Nil(t, err)
+
+}
+
func newMockNacosServiceDiscovery(url *common.URL) (registry.ServiceDiscovery,
error) {
discoveryURL := common.NewURLWithOptions(
common.WithParams(url.GetParams()),
diff --git
a/registry/servicediscovery/customizer/metadata_service_url_params_customizer.go
b/registry/servicediscovery/customizer/metadata_service_url_params_customizer.go
index adddd0c64..e4d1b2a7c 100644
---
a/registry/servicediscovery/customizer/metadata_service_url_params_customizer.go
+++
b/registry/servicediscovery/customizer/metadata_service_url_params_customizer.go
@@ -56,6 +56,7 @@ func (m *metadataServiceURLParamsMetadataCustomizer)
GetPriority() int {
}
func (m *metadataServiceURLParamsMetadataCustomizer) Customize(instance
registry.ServiceInstance) {
+ //todo Multi-instance metadata alignment needs to be improved
url, _ := metadata.GetMetadataService().GetMetadataServiceURL()
if url == nil {
// when metadata service is not exported the url will be
nil,this is because metadata type is remote
diff --git a/registry/servicediscovery/service_discovery_registry.go
b/registry/servicediscovery/service_discovery_registry.go
index b1b387b9f..60014ec25 100644
--- a/registry/servicediscovery/service_discovery_registry.go
+++ b/registry/servicediscovery/service_discovery_registry.go
@@ -19,6 +19,7 @@ package servicediscovery
import (
"bytes"
+ "strconv"
"strings"
"sync"
"time"
@@ -88,9 +89,9 @@ func (s *serviceDiscoveryRegistry) RegisterService() error {
if metaInfo == nil {
panic("no metada info found of registry id " +
s.url.GetParam(constant.RegistryIdKey, ""))
}
- s.instance = createInstance(metaInfo)
- // consumer has no host and port, so it will not register service
- if s.instance.GetHost() != "" && s.instance.GetPort() != 0 {
+ urls := metaInfo.GetExportedServiceURLs()
+ for _, url := range urls {
+ instance := createInstance(metaInfo, url)
metaInfo.CalAndGetRevision()
if metadata.GetMetadataType() ==
constant.RemoteMetadataStorageType {
if s.metadataReport == nil {
@@ -101,15 +102,25 @@ func (s *serviceDiscoveryRegistry) RegisterService()
error {
return err
}
}
- return s.serviceDiscovery.Register(s.instance)
+ err := s.serviceDiscovery.Register(instance)
+ if err != nil {
+ return perrors.WithMessage(err, "Register service
failed")
+ }
}
return nil
}
-func createInstance(meta *info.MetadataInfo) registry.ServiceInstance {
+func createInstance(meta *info.MetadataInfo, url *common.URL)
registry.ServiceInstance {
params := make(map[string]string, 8)
params[constant.MetadataStorageTypePropertyName] =
metadata.GetMetadataType()
+ port, err := strconv.Atoi(url.Port)
+ if err != nil {
+ logger.Warnf("Parse port %s failed, err: %v", url.Port, err)
+ }
instance := ®istry.DefaultServiceInstance{
+ ID: url.Address(),
+ Host: url.Ip,
+ Port: port,
ServiceName: meta.App,
Enable: true,
Healthy: true,
@@ -117,7 +128,6 @@ func createInstance(meta *info.MetadataInfo)
registry.ServiceInstance {
ServiceMetadata: meta,
Tag: meta.Tag,
}
-
for _, cus := range extension.GetCustomizers() {
cus.Customize(instance)
}