This is an automated email from the ASF dual-hosted git repository.

baerwang pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git


The following commit(s) were added to refs/heads/develop by this push:
     new f2773ea2 feat: supports nacos service discovery (#651)
f2773ea2 is described below

commit f2773ea261aa5a4f648330cb5dcc866c617e7c63
Author: marsevilspirit <[email protected]>
AuthorDate: Sun Mar 9 19:25:59 2025 +0800

    feat: supports nacos service discovery (#651)
    
    * support nacos application discovery
    
    * delete debug info
    
    * format import
    
    * add some comments
    
    * fix golangci-lint error
    
    * delete todo
    
    * fix some comment
    
    * update
    
    * change bool to struct{}
---
 .../registry/nacos/application_listener.go         | 174 +++++++++++++++++
 .../registry/nacos/application_service_listener.go | 213 +++++++++++++++++++++
 .../registry/nacos/interface_listener.go           |  64 +++----
 .../dubboregistry/registry/nacos/registry.go       |   8 +-
 .../registry/nacos/service_listener.go             |   4 +-
 .../registry/zookeeper/application_listener.go     |   1 -
 .../zookeeper/application_service_listener.go      |  22 +--
 .../registry/zookeeper/interface_listener.go       |   1 -
 .../registry/zookeeper/service_listener.go         |   2 -
 9 files changed, 432 insertions(+), 57 deletions(-)

diff --git a/pkg/adapter/dubboregistry/registry/nacos/application_listener.go 
b/pkg/adapter/dubboregistry/registry/nacos/application_listener.go
new file mode 100644
index 00000000..bf693729
--- /dev/null
+++ b/pkg/adapter/dubboregistry/registry/nacos/application_listener.go
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package nacos
+
+import (
+       "strings"
+       "sync"
+       "time"
+)
+
+import (
+       "github.com/nacos-group/nacos-sdk-go/clients/naming_client"
+       "github.com/nacos-group/nacos-sdk-go/vo"
+)
+
+import (
+       common2 
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/common"
+       "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry"
+       
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/remoting/zookeeper"
+       "github.com/apache/dubbo-go-pixiu/pkg/logger"
+       "github.com/apache/dubbo-go-pixiu/pkg/model"
+)
+
+var _ registry.Listener = new(nacosAppListener)
+
+type nacosAppListener struct {
+       exit            chan struct{}
+       client          naming_client.INamingClient
+       regConf         *model.Registry
+       reg             *NacosRegistry
+       wg              sync.WaitGroup
+       addr            string
+       adapterListener common2.RegistryEventListener
+       appInfoMap      map[string]*applicationInfo
+}
+
+// newNacosAppListener returns a new nacosAppListener with pre-defined path 
according to the registered type.
+func newNacosAppListener(client naming_client.INamingClient, reg 
*NacosRegistry, regConf *model.Registry, adapterListener 
common2.RegistryEventListener) registry.Listener {
+       return &nacosAppListener{
+               exit:            make(chan struct{}),
+               client:          client,
+               regConf:         regConf,
+               reg:             reg,
+               addr:            regConf.Address,
+               adapterListener: adapterListener,
+               appInfoMap:      map[string]*applicationInfo{},
+       }
+}
+
+func (n *nacosAppListener) Close() {
+       close(n.exit)
+       n.wg.Wait()
+}
+
+func (n *nacosAppListener) WatchAndHandle() {
+       n.wg.Add(1)
+       go n.watch()
+}
+
+func (n *nacosAppListener) watch() {
+       defer n.wg.Done()
+       var (
+               failTimes  int64 = 0
+               delayTimer       = time.NewTimer(ConnDelay * 
time.Duration(failTimes))
+       )
+       defer delayTimer.Stop()
+       for {
+               serviceList, err := 
n.client.GetAllServicesInfo(vo.GetAllServiceInfoParam{
+                       GroupName: n.regConf.Group,
+                       NameSpace: n.regConf.Namespace,
+                       PageSize:  100,
+               })
+               if err != nil {
+                       failTimes++
+                       logger.Infof("watching nacos interface with error{%v}", 
err)
+                       // Exit the watch if root node is in error
+                       // TODO: do not use zookeeper error
+                       if err == zookeeper.ErrNilNode {
+                               logger.Errorf("watching nacos services got 
errNilNode,so exit listen")
+                               return
+                       }
+                       if failTimes > MaxFailTimes {
+                               logger.Errorf("Error happens on nacos exceed 
max fail times: %s,so exit listen", MaxFailTimes)
+                               return
+                       }
+                       delayTimer.Reset(ConnDelay * time.Duration(failTimes))
+                       <-delayTimer.C
+                       continue
+               }
+               failTimes = 0
+               if err := n.updateServiceList(serviceList.Doms); err != nil {
+                       logger.Errorf("update service list failed %s", err)
+               }
+               time.Sleep(time.Second * 5)
+       }
+}
+
+type applicationInfo struct {
+       appName  string
+       listener *appServiceListener
+}
+
+func (a *applicationInfo) String() string {
+       return a.appName
+}
+
+func fromServiceKey(serviceKey string) *applicationInfo {
+       // if serviceKey contains ":" means it is a interface registry
+       // we should ignore it
+       if strings.Contains(serviceKey, ":") {
+               return nil
+       }
+       return &applicationInfo{
+               appName: serviceKey,
+       }
+}
+
+func (n *nacosAppListener) updateServiceList(serviceList []string) error {
+       // add new service info and watch
+       newServiceMap := make(map[string]struct{}, len(serviceList))
+
+       for _, v := range serviceList {
+               appInfo := fromServiceKey(v)
+               if appInfo == nil {
+                       // ignore interface registry
+                       continue
+               }
+               key := appInfo.String()
+               newServiceMap[key] = struct{}{}
+               if _, ok := n.appInfoMap[key]; !ok {
+                       l := newNacosAppSrvListener(n.client, n.adapterListener)
+                       l.wg.Add(1)
+
+                       appInfo.listener = l
+                       n.appInfoMap[key] = appInfo
+
+                       sub := &vo.SubscribeParam{
+                               ServiceName:       appInfo.appName,
+                               SubscribeCallback: l.Callback,
+                               GroupName:         n.regConf.Group,
+                       }
+
+                       if err := n.client.Subscribe(sub); err != nil {
+                               logger.Errorf("subscribe listener with 
interfaceKey = %s, error = %s", l, err)
+                       }
+                       l.wg.Done()
+               }
+       }
+
+       // handle deleted service
+       for k, v := range n.appInfoMap {
+               if _, ok := newServiceMap[k]; !ok {
+                       delete(n.appInfoMap, k)
+                       v.listener.Close()
+               }
+       }
+
+       return nil
+}
diff --git 
a/pkg/adapter/dubboregistry/registry/nacos/application_service_listener.go 
b/pkg/adapter/dubboregistry/registry/nacos/application_service_listener.go
new file mode 100644
index 00000000..92614468
--- /dev/null
+++ b/pkg/adapter/dubboregistry/registry/nacos/application_service_listener.go
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package nacos
+
+import (
+       "fmt"
+       "reflect"
+       "strconv"
+       "strings"
+       "sync"
+)
+
+import (
+       dubboCommon "dubbo.apache.org/dubbo-go/v3/common"
+       dubboConst "dubbo.apache.org/dubbo-go/v3/common/constant"
+       dr "dubbo.apache.org/dubbo-go/v3/registry"
+       "dubbo.apache.org/dubbo-go/v3/registry/servicediscovery"
+       "dubbo.apache.org/dubbo-go/v3/remoting"
+       "github.com/dubbo-go-pixiu/pixiu-api/pkg/api/config"
+       "github.com/nacos-group/nacos-sdk-go/clients/naming_client"
+       nacosModel "github.com/nacos-group/nacos-sdk-go/model"
+)
+
+import (
+       common2 
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/common"
+       "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry"
+       "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
+       "github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+var _ registry.Listener = new(appServiceListener)
+
+type appServiceListener struct {
+       client      naming_client.INamingClient
+       instanceMap map[string]nacosModel.Instance
+       cacheLock   sync.Mutex
+
+       exit            chan struct{}
+       wg              sync.WaitGroup
+       adapterListener common2.RegistryEventListener
+}
+
+func newNacosAppSrvListener(client naming_client.INamingClient, 
adapterListener common2.RegistryEventListener) *appServiceListener {
+       return &appServiceListener{
+               client:          client,
+               exit:            make(chan struct{}),
+               adapterListener: adapterListener,
+               instanceMap:     map[string]nacosModel.Instance{},
+       }
+}
+
+func (l *appServiceListener) WatchAndHandle() {
+       panic("implement me")
+}
+
+func (l *appServiceListener) Close() {
+       close(l.exit)
+       l.wg.Wait()
+}
+
+func (l *appServiceListener) Callback(services []nacosModel.SubscribeService, 
err error) {
+       if err != nil {
+               logger.Errorf("nacos subscribe callback error:%s", err.Error())
+               return
+       }
+
+       addInstances := make([]nacosModel.Instance, 0, len(services))
+       delInstances := make([]nacosModel.Instance, 0, len(services))
+       updateInstances := make([]nacosModel.Instance, 0, len(services))
+       newInstanceMap := make(map[string]nacosModel.Instance, len(services))
+
+       l.cacheLock.Lock()
+       defer l.cacheLock.Unlock()
+       for i := range services {
+               if !services[i].Enable {
+                       // instance is not available, so ignore it
+                       continue
+               }
+               host := services[i].Ip + ":" + 
strconv.Itoa(int(services[i].Port))
+               services[i].ServiceName = 
handleServiceName(services[i].ServiceName)
+               instance := generateInstance(services[i])
+               newInstanceMap[host] = instance
+               if old, ok := l.instanceMap[host]; ok {
+                       // instance does not exist in cache, add it to cache
+                       addInstances = append(addInstances, instance)
+               } else {
+                       if !reflect.DeepEqual(old, instance) {
+                               // instance is not different from cache, update 
it to cache
+                               updateInstances = append(updateInstances, 
instance)
+                       }
+               }
+       }
+
+       for host, inst := range l.instanceMap {
+               if _, ok := newInstanceMap[host]; !ok {
+                       // cache instance does not exist in new instance list, 
remove it from cache
+                       delInstances = append(delInstances, inst)
+               }
+       }
+
+       l.instanceMap = newInstanceMap
+       for i := range addInstances {
+               newURLs := l.getURLs(addInstances[i])
+               for _, url := range newURLs {
+                       l.handle(url, remoting.EventTypeAdd)
+               }
+       }
+       for i := range delInstances {
+               newURLs := l.getURLs(delInstances[i])
+               for _, url := range newURLs {
+                       l.handle(url, remoting.EventTypeDel)
+               }
+       }
+       for i := range updateInstances {
+               newURLs := l.getURLs(updateInstances[i])
+               for _, url := range newURLs {
+                       l.handle(url, remoting.EventTypeUpdate)
+               }
+       }
+}
+
+func (l *appServiceListener) handle(url *dubboCommon.URL, action 
remoting.EventType) {
+       logger.Infof("update begin, service event : %v %v", action, url)
+
+       // NOTE: _ is methods, we can not get methods by application discovery
+       bkConfig, _, location, err := registry.ParseDubboString(url.String())
+       if err != nil {
+               logger.Errorf("parse dubbo url error = %s", err)
+               return
+       }
+
+       apiPattern := registry.GetAPIPattern(bkConfig)
+       mappingParams := []config.MappingParam{
+               {
+                       Name:  "requestBody.values",
+                       MapTo: "opt.values",
+               },
+               {
+                       Name:  "requestBody.types",
+                       MapTo: "opt.types",
+               },
+       }
+
+       api := registry.CreateAPIConfig(apiPattern, location, bkConfig, 
constant.AnyValue, mappingParams)
+       if action == remoting.EventTypeDel {
+               if err := l.adapterListener.OnRemoveAPI(api); err != nil {
+                       logger.Errorf("Error={%s} happens when try to remove 
api %s", err.Error(), api.Path)
+                       return
+               }
+       } else {
+               if err := l.adapterListener.OnAddAPI(api); err != nil {
+                       logger.Errorf("Error={%s} happens when try to add api 
%s", err.Error(), api.Path)
+                       return
+               }
+       }
+}
+
+func (l *appServiceListener) getURLs(nmis nacosModel.Instance) 
[]*dubboCommon.URL {
+       instance := toNacosInstance(nmis)
+       metadata := instance.GetMetadata()
+       metadataInfo, err := 
servicediscovery.GetMetadataInfo(instance.GetServiceName(), instance, 
metadata[dubboConst.ExportedServicesRevisionPropertyName])
+       if err != nil {
+               logger.Errorf("get instance metadata info error %v", 
err.Error())
+               return nil
+       }
+       instance.SetServiceMetadata(metadataInfo)
+       urls := make([]*dubboCommon.URL, 0, len(metadataInfo.Services))
+       for _, service := range metadataInfo.Services {
+               urls = append(urls, instance.ToURLs(service)...)
+       }
+       return urls
+}
+
+// toNacosInstance convert to registry's service instance
+func toNacosInstance(nmis nacosModel.Instance) dr.ServiceInstance {
+       md := make(map[string]string, len(nmis.Metadata))
+       for k, v := range nmis.Metadata {
+               md[k] = fmt.Sprint(v)
+       }
+       return &dr.DefaultServiceInstance{
+               ID:          nmis.InstanceId,
+               ServiceName: nmis.ServiceName,
+               Host:        nmis.Ip,
+               Port:        int(nmis.Port),
+               Enable:      nmis.Enable,
+               Healthy:     nmis.Healthy,
+               Metadata:    md,
+       }
+}
+
+// group@@serviceName convert to serviceName
+func handleServiceName(serviceName string) string {
+       parts := strings.Split(serviceName, "@@")
+       if len(parts) > 1 {
+               return parts[1]
+       }
+       return ""
+}
diff --git a/pkg/adapter/dubboregistry/registry/nacos/interface_listener.go 
b/pkg/adapter/dubboregistry/registry/nacos/interface_listener.go
index 3588f33f..3536470f 100644
--- a/pkg/adapter/dubboregistry/registry/nacos/interface_listener.go
+++ b/pkg/adapter/dubboregistry/registry/nacos/interface_listener.go
@@ -71,27 +71,27 @@ func newNacosIntfListener(client 
naming_client.INamingClient, reg *NacosRegistry
        }
 }
 
-func (z *nacosIntfListener) Close() {
-       close(z.exit)
-       z.wg.Wait()
+func (n *nacosIntfListener) Close() {
+       close(n.exit)
+       n.wg.Wait()
 }
 
-func (z *nacosIntfListener) WatchAndHandle() {
-       z.wg.Add(1)
-       go z.watch()
+func (n *nacosIntfListener) WatchAndHandle() {
+       n.wg.Add(1)
+       go n.watch()
 }
 
-func (z *nacosIntfListener) watch() {
-       defer z.wg.Done()
+func (n *nacosIntfListener) watch() {
+       defer n.wg.Done()
        var (
                failTimes  int64 = 0
                delayTimer       = time.NewTimer(ConnDelay * 
time.Duration(failTimes))
        )
        defer delayTimer.Stop()
        for {
-               serviceList, err := 
z.client.GetAllServicesInfo(vo.GetAllServiceInfoParam{
-                       GroupName: z.regConf.Group,
-                       NameSpace: z.regConf.Namespace,
+               serviceList, err := 
n.client.GetAllServicesInfo(vo.GetAllServiceInfoParam{
+                       GroupName: n.regConf.Group,
+                       NameSpace: n.regConf.Namespace,
                        PageSize:  100,
                })
                // error handling
@@ -112,7 +112,7 @@ func (z *nacosIntfListener) watch() {
                        continue
                }
                failTimes = 0
-               if err := z.updateServiceList(serviceList.Doms); err != nil {
+               if err := n.updateServiceList(serviceList.Doms); err != nil {
                        logger.Errorf("update service list failed %s", err)
                }
                time.Sleep(time.Second * 5)
@@ -142,10 +142,9 @@ func fromServiceFullKey(fullKey string) *serviceInfo {
        }
 }
 
-func (z *nacosIntfListener) updateServiceList(serviceList []string) error {
+func (n *nacosIntfListener) updateServiceList(serviceList []string) error {
        // add new service info and watch
-
-       newServiceMap := make(map[string]bool)
+       newServiceMap := make(map[string]struct{})
 
        for _, v := range serviceList {
                svcInfo := fromServiceFullKey(v)
@@ -154,45 +153,40 @@ func (z *nacosIntfListener) updateServiceList(serviceList 
[]string) error {
                        continue
                }
                key := svcInfo.String()
-               newServiceMap[key] = true
-               if _, ok := z.serviceInfoMap[key]; !ok {
-
+               newServiceMap[key] = struct{}{}
+               if _, ok := n.serviceInfoMap[key]; !ok {
                        url, _ := dubboCommon.NewURL("mock://localhost:8848")
                        url.SetParam(constant.InterfaceKey, 
svcInfo.interfaceName)
                        url.SetParam(constant.GroupKey, svcInfo.group)
                        url.SetParam(constant.VersionKey, svcInfo.version)
-                       l := newNacosSrvListener(url, z.client, 
z.adapterListener)
+                       l := newNacosSrvListener(url, n.client, 
n.adapterListener)
                        l.wg.Add(1)
 
                        svcInfo.listener = l
-                       z.serviceInfoMap[key] = svcInfo
-
-                       go func(v *serviceInfo) {
-                               defer l.wg.Done()
+                       n.serviceInfoMap[key] = svcInfo
 
-                               sub := &vo.SubscribeParam{
-                                       ServiceName:       
getSubscribeName(url),
-                                       SubscribeCallback: l.Callback,
-                                       GroupName:         z.regConf.Group,
-                               }
+                       sub := &vo.SubscribeParam{
+                               ServiceName:       getSubscribeName(url),
+                               SubscribeCallback: l.Callback,
+                               GroupName:         n.regConf.Group,
+                       }
 
-                               if err := z.client.Subscribe(sub); err != nil {
-                                       logger.Errorf("subscribe listener with 
interfaceKey = %s, error = %s", l, err)
-                               }
-                       }(svcInfo)
+                       if err := n.client.Subscribe(sub); err != nil {
+                               logger.Errorf("subscribe listener with 
interfaceKey = %s, error = %s", l, err)
+                       }
+                       l.wg.Done()
                }
        }
 
        // handle deleted service
-       for k, v := range z.serviceInfoMap {
+       for k, v := range n.serviceInfoMap {
                if _, ok := newServiceMap[k]; !ok {
-                       delete(z.serviceInfoMap, k)
+                       delete(n.serviceInfoMap, k)
                        v.listener.Close()
                }
        }
 
        return nil
-
 }
 
 func getSubscribeName(url *dubboCommon.URL) string {
diff --git a/pkg/adapter/dubboregistry/registry/nacos/registry.go 
b/pkg/adapter/dubboregistry/registry/nacos/registry.go
index 72eb15f4..5b287e15 100644
--- a/pkg/adapter/dubboregistry/registry/nacos/registry.go
+++ b/pkg/adapter/dubboregistry/registry/nacos/registry.go
@@ -45,11 +45,11 @@ type NacosRegistry struct {
 }
 
 func (n *NacosRegistry) DoSubscribe() error {
-       intfListener, ok := n.nacosListeners[n.RegisteredType]
+       Listener, ok := n.nacosListeners[n.RegisteredType]
        if !ok {
                return errors.New("Listener for interface level registration 
does not initialized")
        }
-       go intfListener.WatchAndHandle()
+       go Listener.WatchAndHandle()
        return nil
 }
 
@@ -95,8 +95,8 @@ func newNacosRegistry(regConfig model.Registry, 
adapterListener common.RegistryE
        switch nacosRegistry.RegisteredType {
        case registry.RegisteredTypeInterface:
                nacosRegistry.nacosListeners[nacosRegistry.RegisteredType] = 
newNacosIntfListener(client, nacosRegistry, &regConfig, adapterListener)
-       //case registry.RegisteredTypeApplication:
-       //nacosRegistry.nacosListeners[nacosRegistry.RegisteredType] = 
newZkAppListener(zkReg.client, zkReg, zkReg.AdapterListener)
+       case registry.RegisteredTypeApplication:
+               nacosRegistry.nacosListeners[nacosRegistry.RegisteredType] = 
newNacosAppListener(client, nacosRegistry, &regConfig, adapterListener)
        default:
                return nil, errors.Errorf("Unsupported registry type: %s", 
regConfig.RegistryType)
        }
diff --git a/pkg/adapter/dubboregistry/registry/nacos/service_listener.go 
b/pkg/adapter/dubboregistry/registry/nacos/service_listener.go
index b24bef32..0fdb0490 100644
--- a/pkg/adapter/dubboregistry/registry/nacos/service_listener.go
+++ b/pkg/adapter/dubboregistry/registry/nacos/service_listener.go
@@ -130,7 +130,6 @@ func (z *serviceListener) Callback(services 
[]nacosModel.SubscribeService, err e
 }
 
 func (z *serviceListener) handle(url *dubboCommon.URL, action 
remoting.EventType) {
-
        logger.Infof("update begin, service event: %v %v", action, url)
 
        bkConfig, methods, location, err := 
registry.ParseDubboString(url.String())
@@ -167,6 +166,7 @@ func (z *serviceListener) handle(url *dubboCommon.URL, 
action remoting.EventType
        }
 }
 
+// TODO: this function is useless for service listener
 func (z *serviceListener) NotifyAll(e []*dubboRegistry.ServiceEvent, f func()) 
{
 }
 
@@ -184,7 +184,7 @@ func generateURL(instance nacosModel.Instance) 
*dubboCommon.URL {
        path := instance.Metadata["path"]
        myInterface := instance.Metadata["interface"]
        if len(path) == 0 && len(myInterface) == 0 {
-               logger.Errorf("nacos instance metadata does not have  both path 
key and interface key,instance:%+v", instance)
+               logger.Errorf("nacos instance metadata does not have both path 
key and interface key,instance:%+v", instance)
                return nil
        }
        if len(path) == 0 && len(myInterface) != 0 {
diff --git 
a/pkg/adapter/dubboregistry/registry/zookeeper/application_listener.go 
b/pkg/adapter/dubboregistry/registry/zookeeper/application_listener.go
index 910b2c53..09cca576 100644
--- a/pkg/adapter/dubboregistry/registry/zookeeper/application_listener.go
+++ b/pkg/adapter/dubboregistry/registry/zookeeper/application_listener.go
@@ -75,7 +75,6 @@ func (z *zkAppListener) WatchAndHandle() {
 
 func (z *zkAppListener) watch() {
        defer z.wg.Done()
-
        var (
                failTimes  int64 = 0
                delayTimer       = time.NewTimer(ConnDelay * 
time.Duration(failTimes))
diff --git 
a/pkg/adapter/dubboregistry/registry/zookeeper/application_service_listener.go 
b/pkg/adapter/dubboregistry/registry/zookeeper/application_service_listener.go
index 7f9cfb52..87024d3c 100644
--- 
a/pkg/adapter/dubboregistry/registry/zookeeper/application_service_listener.go
+++ 
b/pkg/adapter/dubboregistry/registry/zookeeper/application_service_listener.go
@@ -32,29 +32,29 @@ import (
        dr "dubbo.apache.org/dubbo-go/v3/registry"
        "dubbo.apache.org/dubbo-go/v3/registry/servicediscovery"
        "dubbo.apache.org/dubbo-go/v3/remoting/zookeeper/curator_discovery"
+       "github.com/dubbo-go-pixiu/pixiu-api/pkg/api/config"
+       "github.com/dubbogo/go-zookeeper/zk"
+)
 
+import (
        "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/common"
        "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry"
        
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/remoting/zookeeper"
        "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
        "github.com/apache/dubbo-go-pixiu/pkg/logger"
-
-       "github.com/dubbo-go-pixiu/pixiu-api/pkg/api/config"
-
-       "github.com/dubbogo/go-zookeeper/zk"
 )
 
 var _ registry.Listener = new(applicationServiceListener)
 
 // applicationServiceListener normally monitors the /services/[:application]
 type applicationServiceListener struct {
-       urls            []*dubboCommon.URL
-       servicePath     string
-       client          *zookeeper.ZooKeeperClient
-       adapterListener common.RegistryEventListener
+       urls        []*dubboCommon.URL
+       servicePath string
+       client      *zookeeper.ZooKeeperClient
 
-       exit chan struct{}
-       wg   sync.WaitGroup
+       exit            chan struct{}
+       wg              sync.WaitGroup
+       adapterListener common.RegistryEventListener
 }
 
 // newApplicationServiceListener creates a new zk service listener
@@ -69,7 +69,6 @@ func newApplicationServiceListener(path string, client 
*zookeeper.ZooKeeperClien
 
 func (asl *applicationServiceListener) WatchAndHandle() {
        defer asl.wg.Done()
-
        var (
                failTimes  int64 = 0
                delayTimer       = time.NewTimer(ConnDelay * 
time.Duration(failTimes))
@@ -77,7 +76,6 @@ func (asl *applicationServiceListener) WatchAndHandle() {
        defer delayTimer.Stop()
        for {
                children, e, err := asl.client.GetChildrenW(asl.servicePath)
-               // error handling
                if err != nil {
                        failTimes++
                        logger.Infof("watching (path{%s}) = error{%v}", 
asl.servicePath, err)
diff --git a/pkg/adapter/dubboregistry/registry/zookeeper/interface_listener.go 
b/pkg/adapter/dubboregistry/registry/zookeeper/interface_listener.go
index 78f16dfb..c2826d15 100644
--- a/pkg/adapter/dubboregistry/registry/zookeeper/interface_listener.go
+++ b/pkg/adapter/dubboregistry/registry/zookeeper/interface_listener.go
@@ -82,7 +82,6 @@ func (z *zkIntfListener) watch() {
        defer delayTimer.Stop()
        for {
                _, e, err := z.client.GetChildrenW(z.path)
-               // error handling
                if err != nil {
                        failTimes++
                        logger.Infof("watching (path{%s}) = error{%v}", z.path, 
err)
diff --git a/pkg/adapter/dubboregistry/registry/zookeeper/service_listener.go 
b/pkg/adapter/dubboregistry/registry/zookeeper/service_listener.go
index c6697426..c49f9943 100644
--- a/pkg/adapter/dubboregistry/registry/zookeeper/service_listener.go
+++ b/pkg/adapter/dubboregistry/registry/zookeeper/service_listener.go
@@ -64,12 +64,10 @@ func newZkSrvListener(url *common.URL, path string, client 
*zookeeper.ZooKeeperC
 
 func (zkl *serviceListener) WatchAndHandle() {
        defer zkl.wg.Done()
-
        var (
                failTimes  int64 = 0
                delayTimer       = time.NewTimer(ConnDelay * 
time.Duration(failTimes))
        )
-
        for {
                children, e, err := zkl.client.GetChildrenW(zkl.path)
                // error handling

Reply via email to