This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/develop by this push:
     new eee5acd89 Fix nacos instance overwrite (#2882)
eee5acd89 is described below

commit eee5acd896a2cbabc783a2c1f5ebda45dd6e092f
Author: Wiggins <[email protected]>
AuthorDate: Thu May 29 15:40:00 2025 +0800

    Fix nacos instance overwrite (#2882)
    
    * fix register overwrite #2581
    
    * update #2581
    
    * update Batch register instances with the same service name. #2581
    
    * update #2581
    
    * update #2581
    
    * update #2581
    
    * update #2581
    
    * update:Fixed some redundancies and formatting #2581
    
    * update:Unit tests are added, and the 
metadata:dubbo.metadata-service.url-params port protocol of instances is fixed 
in multi-instance cases #2581
    
    * update:testfaild #2581
    
    * update:#2581
    
    * update:#2581
    
    * update:#2581
    
    * update:#2581
    
    * update:#2581
    
    * update: todo #2581
    
    * update: name #2581
    
    ---------
    
    Co-authored-by: wiggins <[email protected]>
---
 registry/nacos/service_discovery.go                | 43 ++++++++++---
 registry/nacos/service_discovery_test.go           | 71 ++++++++++++++++++++++
 .../metadata_service_url_params_customizer.go      |  1 +
 .../servicediscovery/service_discovery_registry.go | 22 +++++--
 4 files changed, 121 insertions(+), 16 deletions(-)

diff --git a/registry/nacos/service_discovery.go 
b/registry/nacos/service_discovery.go
index 7628b1ac4..5d255bfd1 100644
--- a/registry/nacos/service_discovery.go
+++ b/registry/nacos/service_discovery.go
@@ -66,6 +66,8 @@ type nacosServiceDiscovery struct {
        // cache registry instances
        registryInstances []registry.ServiceInstance
 
+       serviceNameInstancesMap map[string][]registry.ServiceInstance //Batch 
registration for the same service
+
        // registryURL stores the URL used for registration, used to fetch 
dynamic config like weight
        registryURL *common.URL
 
@@ -89,12 +91,17 @@ func (n *nacosServiceDiscovery) Destroy() error {
 
 // Register will register the service to nacos
 func (n *nacosServiceDiscovery) Register(instance registry.ServiceInstance) 
error {
-       ins := n.toRegisterInstance(instance)
-       ok, err := n.namingClient.Client().RegisterInstance(ins)
+       instSrvName := instance.GetServiceName()
+       if n.serviceNameInstancesMap == nil {
+               n.serviceNameInstancesMap = 
make(map[string][]registry.ServiceInstance)
+       }
+       n.serviceNameInstancesMap[instSrvName] = 
append(n.serviceNameInstancesMap[instSrvName], instance)
+       brins := 
n.toBatchRegisterInstances(n.serviceNameInstancesMap[instSrvName])
+       ok, err := n.namingClient.Client().BatchRegisterInstance(brins)
        if err != nil || !ok {
-               return perrors.WithMessage(err, "Could not register the 
instance. "+instance.GetServiceName())
+               return perrors.Errorf("register nacos instances failed, 
err:%+v", err)
        }
-       n.registryInstances = append(n.registryInstances, instance)
+       n.registryInstances = append(n.registryInstances, instance) 
//all_instances
        return nil
 }
 
@@ -338,6 +345,21 @@ func (n *nacosServiceDiscovery) 
toRegisterInstance(instance registry.ServiceInst
                Ephemeral: true,
        }
 }
+func (n *nacosServiceDiscovery) toBatchRegisterInstances(instances 
[]registry.ServiceInstance) vo.BatchRegisterInstanceParam {
+       var brins vo.BatchRegisterInstanceParam
+       var rins []vo.RegisterInstanceParam
+       for _, instance := range instances {
+               rins = append(rins, n.toRegisterInstance(instance))
+       }
+       if len(rins) == 0 {
+               logger.Warnf("No batch register instances found")
+               return vo.BatchRegisterInstanceParam{}
+       }
+       brins.ServiceName = rins[0].ServiceName
+       brins.GroupName = n.group
+       brins.Instances = rins
+       return brins
+}
 
 // toDeregisterInstance will convert the ServiceInstance to 
DeregisterInstanceParam
 func (n *nacosServiceDiscovery) toDeregisterInstance(instance 
registry.ServiceInstance) vo.DeregisterInstanceParam {
@@ -374,12 +396,13 @@ func newNacosServiceDiscovery(url *common.URL) 
(registry.ServiceDiscovery, error
 
        group := url.GetParam(constant.RegistryGroupKey, defaultGroup)
        newInstance := &nacosServiceDiscovery{
-               group:               group,
-               namingClient:        client,
-               descriptor:          descriptor,
-               registryInstances:   []registry.ServiceInstance{},
-               registryURL:         url,
-               instanceListenerMap: make(map[string]*gxset.HashSet),
+               group:                   group,
+               namingClient:            client,
+               descriptor:              descriptor,
+               registryInstances:       []registry.ServiceInstance{},
+               serviceNameInstancesMap: 
make(map[string][]registry.ServiceInstance),
+               registryURL:             url,
+               instanceListenerMap:     make(map[string]*gxset.HashSet),
        }
        return newInstance, nil
 }
diff --git a/registry/nacos/service_discovery_test.go 
b/registry/nacos/service_discovery_test.go
index 69124ef91..edc9d4893 100644
--- a/registry/nacos/service_discovery_test.go
+++ b/registry/nacos/service_discovery_test.go
@@ -19,7 +19,9 @@ package nacos
 
 import (
        "context"
+       "encoding/json"
        "fmt"
+       "strconv"
        "sync"
        "testing"
 )
@@ -117,6 +119,75 @@ func TestFunction(t *testing.T) {
        assert.Nil(t, err)
 }
 
+func TestBatchRegisterInstances(t *testing.T) {
+       extension.SetProtocol("mock", func() protocol.Protocol {
+               return &mockProtocol{}
+       })
+       var urls []*common.URL
+       url1, _ := common.NewURL("dubbo://127.0.0.1:8848")
+       url2, _ := common.NewURL("tri://127.0.0.1:8848")
+       port := 20000
+       urls = append(urls, url1)
+       urls = append(urls, url2)
+       for _, url := range urls {
+               pcl := url.Protocol
+               port = port + 1
+               sd, _ := newMockNacosServiceDiscovery(url)
+               defer func() {
+                       _ = sd.Destroy()
+               }()
+               ins := &registry.DefaultServiceInstance{
+                       ID:          "testID",
+                       ServiceName: "nacos_batchRegister_test1",
+                       Host:        url.Ip,
+                       Port:        port,
+                       Enable:      true,
+                       Healthy:     true,
+                       Metadata:    nil,
+               }
+               params := map[string]string{
+                       "protocol": "mock",
+                       "timeout":  "",
+                       "version":  "",
+                       pcl:        "",
+                       "release":  "",
+                       "port":     strconv.Itoa(port),
+               }
+               parmjosn, _ := json.Marshal(params)
+               ins.Metadata = map[string]string{"t1": "test", 
constant.MetadataServiceURLParamsPropertyName: string(parmjosn)}
+               err := sd.Register(ins)
+               assert.Nil(t, err)
+       }
+
+       url3, _ := common.NewURL("tri://127.0.0.1:8848")
+       sd, _ := newMockNacosServiceDiscovery(url3)
+       defer func() {
+               _ = sd.Destroy()
+       }()
+       ins := &registry.DefaultServiceInstance{
+               ID:          "testID",
+               ServiceName: "nacos_batchRegister_test2",
+               Host:        "127.0.0.1",
+               Port:        20004,
+               Enable:      true,
+               Healthy:     true,
+               Metadata:    nil,
+       }
+       params := map[string]string{
+               "protocol":    "mock",
+               "timeout":     "",
+               "version":     "",
+               url3.Protocol: "",
+               "release":     "",
+               "port":        "20004",
+       }
+       parmjosn, _ := json.Marshal(params)
+       ins.Metadata = map[string]string{"t1": "test", 
constant.MetadataServiceURLParamsPropertyName: string(parmjosn)}
+       err := sd.Register(ins)
+       assert.Nil(t, err)
+
+}
+
 func newMockNacosServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, 
error) {
        discoveryURL := common.NewURLWithOptions(
                common.WithParams(url.GetParams()),
diff --git 
a/registry/servicediscovery/customizer/metadata_service_url_params_customizer.go
 
b/registry/servicediscovery/customizer/metadata_service_url_params_customizer.go
index adddd0c64..e4d1b2a7c 100644
--- 
a/registry/servicediscovery/customizer/metadata_service_url_params_customizer.go
+++ 
b/registry/servicediscovery/customizer/metadata_service_url_params_customizer.go
@@ -56,6 +56,7 @@ func (m *metadataServiceURLParamsMetadataCustomizer) 
GetPriority() int {
 }
 
 func (m *metadataServiceURLParamsMetadataCustomizer) Customize(instance 
registry.ServiceInstance) {
+       //todo Multi-instance metadata alignment needs to be improved
        url, _ := metadata.GetMetadataService().GetMetadataServiceURL()
        if url == nil {
                // when metadata service is not exported the url will be 
nil,this is because metadata type is remote
diff --git a/registry/servicediscovery/service_discovery_registry.go 
b/registry/servicediscovery/service_discovery_registry.go
index b1b387b9f..60014ec25 100644
--- a/registry/servicediscovery/service_discovery_registry.go
+++ b/registry/servicediscovery/service_discovery_registry.go
@@ -19,6 +19,7 @@ package servicediscovery
 
 import (
        "bytes"
+       "strconv"
        "strings"
        "sync"
        "time"
@@ -88,9 +89,9 @@ func (s *serviceDiscoveryRegistry) RegisterService() error {
        if metaInfo == nil {
                panic("no metada info found of registry id " + 
s.url.GetParam(constant.RegistryIdKey, ""))
        }
-       s.instance = createInstance(metaInfo)
-       // consumer has no host and port, so it will not register service
-       if s.instance.GetHost() != "" && s.instance.GetPort() != 0 {
+       urls := metaInfo.GetExportedServiceURLs()
+       for _, url := range urls {
+               instance := createInstance(metaInfo, url)
                metaInfo.CalAndGetRevision()
                if metadata.GetMetadataType() == 
constant.RemoteMetadataStorageType {
                        if s.metadataReport == nil {
@@ -101,15 +102,25 @@ func (s *serviceDiscoveryRegistry) RegisterService() 
error {
                                return err
                        }
                }
-               return s.serviceDiscovery.Register(s.instance)
+               err := s.serviceDiscovery.Register(instance)
+               if err != nil {
+                       return perrors.WithMessage(err, "Register service 
failed")
+               }
        }
        return nil
 }
 
-func createInstance(meta *info.MetadataInfo) registry.ServiceInstance {
+func createInstance(meta *info.MetadataInfo, url *common.URL) 
registry.ServiceInstance {
        params := make(map[string]string, 8)
        params[constant.MetadataStorageTypePropertyName] = 
metadata.GetMetadataType()
+       port, err := strconv.Atoi(url.Port)
+       if err != nil {
+               logger.Warnf("Parse port %s failed, err: %v", url.Port, err)
+       }
        instance := &registry.DefaultServiceInstance{
+               ID:              url.Address(),
+               Host:            url.Ip,
+               Port:            port,
                ServiceName:     meta.App,
                Enable:          true,
                Healthy:         true,
@@ -117,7 +128,6 @@ func createInstance(meta *info.MetadataInfo) 
registry.ServiceInstance {
                ServiceMetadata: meta,
                Tag:             meta.Tag,
        }
-
        for _, cus := range extension.GetCustomizers() {
                cus.Customize(instance)
        }

Reply via email to