This is an automated email from the ASF dual-hosted git repository.
baerwang pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git
The following commit(s) were added to refs/heads/develop by this push:
new f2773ea2 feat: supports nacos service discovery (#651)
f2773ea2 is described below
commit f2773ea261aa5a4f648330cb5dcc866c617e7c63
Author: marsevilspirit <[email protected]>
AuthorDate: Sun Mar 9 19:25:59 2025 +0800
feat: supports nacos service discovery (#651)
* support nacos application discovery
* delete debug info
* format import
* add some comments
* fix golangci-lint error
* delete todo
* fix some comment
* update
* change bool to struct{}
---
.../registry/nacos/application_listener.go | 174 +++++++++++++++++
.../registry/nacos/application_service_listener.go | 213 +++++++++++++++++++++
.../registry/nacos/interface_listener.go | 64 +++----
.../dubboregistry/registry/nacos/registry.go | 8 +-
.../registry/nacos/service_listener.go | 4 +-
.../registry/zookeeper/application_listener.go | 1 -
.../zookeeper/application_service_listener.go | 22 +--
.../registry/zookeeper/interface_listener.go | 1 -
.../registry/zookeeper/service_listener.go | 2 -
9 files changed, 432 insertions(+), 57 deletions(-)
diff --git a/pkg/adapter/dubboregistry/registry/nacos/application_listener.go
b/pkg/adapter/dubboregistry/registry/nacos/application_listener.go
new file mode 100644
index 00000000..bf693729
--- /dev/null
+++ b/pkg/adapter/dubboregistry/registry/nacos/application_listener.go
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package nacos
+
+import (
+ "strings"
+ "sync"
+ "time"
+)
+
+import (
+ "github.com/nacos-group/nacos-sdk-go/clients/naming_client"
+ "github.com/nacos-group/nacos-sdk-go/vo"
+)
+
+import (
+ common2
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/common"
+ "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry"
+
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/remoting/zookeeper"
+ "github.com/apache/dubbo-go-pixiu/pkg/logger"
+ "github.com/apache/dubbo-go-pixiu/pkg/model"
+)
+
+var _ registry.Listener = new(nacosAppListener)
+
+type nacosAppListener struct {
+ exit chan struct{}
+ client naming_client.INamingClient
+ regConf *model.Registry
+ reg *NacosRegistry
+ wg sync.WaitGroup
+ addr string
+ adapterListener common2.RegistryEventListener
+ appInfoMap map[string]*applicationInfo
+}
+
+// newNacosAppListener returns a new nacosAppListener with pre-defined path
according to the registered type.
+func newNacosAppListener(client naming_client.INamingClient, reg
*NacosRegistry, regConf *model.Registry, adapterListener
common2.RegistryEventListener) registry.Listener {
+ return &nacosAppListener{
+ exit: make(chan struct{}),
+ client: client,
+ regConf: regConf,
+ reg: reg,
+ addr: regConf.Address,
+ adapterListener: adapterListener,
+ appInfoMap: map[string]*applicationInfo{},
+ }
+}
+
+func (n *nacosAppListener) Close() {
+ close(n.exit)
+ n.wg.Wait()
+}
+
+func (n *nacosAppListener) WatchAndHandle() {
+ n.wg.Add(1)
+ go n.watch()
+}
+
+func (n *nacosAppListener) watch() {
+ defer n.wg.Done()
+ var (
+ failTimes int64 = 0
+ delayTimer = time.NewTimer(ConnDelay *
time.Duration(failTimes))
+ )
+ defer delayTimer.Stop()
+ for {
+ serviceList, err :=
n.client.GetAllServicesInfo(vo.GetAllServiceInfoParam{
+ GroupName: n.regConf.Group,
+ NameSpace: n.regConf.Namespace,
+ PageSize: 100,
+ })
+ if err != nil {
+ failTimes++
+ logger.Infof("watching nacos interface with error{%v}",
err)
+ // Exit the watch if root node is in error
+ // TODO: do not use zookeeper error
+ if err == zookeeper.ErrNilNode {
+ logger.Errorf("watching nacos services got
errNilNode,so exit listen")
+ return
+ }
+ if failTimes > MaxFailTimes {
+ logger.Errorf("Error happens on nacos exceed
max fail times: %s,so exit listen", MaxFailTimes)
+ return
+ }
+ delayTimer.Reset(ConnDelay * time.Duration(failTimes))
+ <-delayTimer.C
+ continue
+ }
+ failTimes = 0
+ if err := n.updateServiceList(serviceList.Doms); err != nil {
+ logger.Errorf("update service list failed %s", err)
+ }
+ time.Sleep(time.Second * 5)
+ }
+}
+
+type applicationInfo struct {
+ appName string
+ listener *appServiceListener
+}
+
+func (a *applicationInfo) String() string {
+ return a.appName
+}
+
+func fromServiceKey(serviceKey string) *applicationInfo {
+ // if serviceKey contains ":" means it is a interface registry
+ // we should ignore it
+ if strings.Contains(serviceKey, ":") {
+ return nil
+ }
+ return &applicationInfo{
+ appName: serviceKey,
+ }
+}
+
+func (n *nacosAppListener) updateServiceList(serviceList []string) error {
+ // add new service info and watch
+ newServiceMap := make(map[string]struct{}, len(serviceList))
+
+ for _, v := range serviceList {
+ appInfo := fromServiceKey(v)
+ if appInfo == nil {
+ // ignore interface registry
+ continue
+ }
+ key := appInfo.String()
+ newServiceMap[key] = struct{}{}
+ if _, ok := n.appInfoMap[key]; !ok {
+ l := newNacosAppSrvListener(n.client, n.adapterListener)
+ l.wg.Add(1)
+
+ appInfo.listener = l
+ n.appInfoMap[key] = appInfo
+
+ sub := &vo.SubscribeParam{
+ ServiceName: appInfo.appName,
+ SubscribeCallback: l.Callback,
+ GroupName: n.regConf.Group,
+ }
+
+ if err := n.client.Subscribe(sub); err != nil {
+ logger.Errorf("subscribe listener with
interfaceKey = %s, error = %s", l, err)
+ }
+ l.wg.Done()
+ }
+ }
+
+ // handle deleted service
+ for k, v := range n.appInfoMap {
+ if _, ok := newServiceMap[k]; !ok {
+ delete(n.appInfoMap, k)
+ v.listener.Close()
+ }
+ }
+
+ return nil
+}
diff --git
a/pkg/adapter/dubboregistry/registry/nacos/application_service_listener.go
b/pkg/adapter/dubboregistry/registry/nacos/application_service_listener.go
new file mode 100644
index 00000000..92614468
--- /dev/null
+++ b/pkg/adapter/dubboregistry/registry/nacos/application_service_listener.go
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package nacos
+
+import (
+ "fmt"
+ "reflect"
+ "strconv"
+ "strings"
+ "sync"
+)
+
+import (
+ dubboCommon "dubbo.apache.org/dubbo-go/v3/common"
+ dubboConst "dubbo.apache.org/dubbo-go/v3/common/constant"
+ dr "dubbo.apache.org/dubbo-go/v3/registry"
+ "dubbo.apache.org/dubbo-go/v3/registry/servicediscovery"
+ "dubbo.apache.org/dubbo-go/v3/remoting"
+ "github.com/dubbo-go-pixiu/pixiu-api/pkg/api/config"
+ "github.com/nacos-group/nacos-sdk-go/clients/naming_client"
+ nacosModel "github.com/nacos-group/nacos-sdk-go/model"
+)
+
+import (
+ common2
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/common"
+ "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry"
+ "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
+ "github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+var _ registry.Listener = new(appServiceListener)
+
+type appServiceListener struct {
+ client naming_client.INamingClient
+ instanceMap map[string]nacosModel.Instance
+ cacheLock sync.Mutex
+
+ exit chan struct{}
+ wg sync.WaitGroup
+ adapterListener common2.RegistryEventListener
+}
+
+func newNacosAppSrvListener(client naming_client.INamingClient,
adapterListener common2.RegistryEventListener) *appServiceListener {
+ return &appServiceListener{
+ client: client,
+ exit: make(chan struct{}),
+ adapterListener: adapterListener,
+ instanceMap: map[string]nacosModel.Instance{},
+ }
+}
+
+func (l *appServiceListener) WatchAndHandle() {
+ panic("implement me")
+}
+
+func (l *appServiceListener) Close() {
+ close(l.exit)
+ l.wg.Wait()
+}
+
+func (l *appServiceListener) Callback(services []nacosModel.SubscribeService,
err error) {
+ if err != nil {
+ logger.Errorf("nacos subscribe callback error:%s", err.Error())
+ return
+ }
+
+ addInstances := make([]nacosModel.Instance, 0, len(services))
+ delInstances := make([]nacosModel.Instance, 0, len(services))
+ updateInstances := make([]nacosModel.Instance, 0, len(services))
+ newInstanceMap := make(map[string]nacosModel.Instance, len(services))
+
+ l.cacheLock.Lock()
+ defer l.cacheLock.Unlock()
+ for i := range services {
+ if !services[i].Enable {
+ // instance is not available, so ignore it
+ continue
+ }
+ host := services[i].Ip + ":" +
strconv.Itoa(int(services[i].Port))
+ services[i].ServiceName =
handleServiceName(services[i].ServiceName)
+ instance := generateInstance(services[i])
+ newInstanceMap[host] = instance
+ if old, ok := l.instanceMap[host]; ok {
+ // instance does not exist in cache, add it to cache
+ addInstances = append(addInstances, instance)
+ } else {
+ if !reflect.DeepEqual(old, instance) {
+ // instance is not different from cache, update
it to cache
+ updateInstances = append(updateInstances,
instance)
+ }
+ }
+ }
+
+ for host, inst := range l.instanceMap {
+ if _, ok := newInstanceMap[host]; !ok {
+ // cache instance does not exist in new instance list,
remove it from cache
+ delInstances = append(delInstances, inst)
+ }
+ }
+
+ l.instanceMap = newInstanceMap
+ for i := range addInstances {
+ newURLs := l.getURLs(addInstances[i])
+ for _, url := range newURLs {
+ l.handle(url, remoting.EventTypeAdd)
+ }
+ }
+ for i := range delInstances {
+ newURLs := l.getURLs(delInstances[i])
+ for _, url := range newURLs {
+ l.handle(url, remoting.EventTypeDel)
+ }
+ }
+ for i := range updateInstances {
+ newURLs := l.getURLs(updateInstances[i])
+ for _, url := range newURLs {
+ l.handle(url, remoting.EventTypeUpdate)
+ }
+ }
+}
+
+func (l *appServiceListener) handle(url *dubboCommon.URL, action
remoting.EventType) {
+ logger.Infof("update begin, service event : %v %v", action, url)
+
+ // NOTE: _ is methods, we can not get methods by application discovery
+ bkConfig, _, location, err := registry.ParseDubboString(url.String())
+ if err != nil {
+ logger.Errorf("parse dubbo url error = %s", err)
+ return
+ }
+
+ apiPattern := registry.GetAPIPattern(bkConfig)
+ mappingParams := []config.MappingParam{
+ {
+ Name: "requestBody.values",
+ MapTo: "opt.values",
+ },
+ {
+ Name: "requestBody.types",
+ MapTo: "opt.types",
+ },
+ }
+
+ api := registry.CreateAPIConfig(apiPattern, location, bkConfig,
constant.AnyValue, mappingParams)
+ if action == remoting.EventTypeDel {
+ if err := l.adapterListener.OnRemoveAPI(api); err != nil {
+ logger.Errorf("Error={%s} happens when try to remove
api %s", err.Error(), api.Path)
+ return
+ }
+ } else {
+ if err := l.adapterListener.OnAddAPI(api); err != nil {
+ logger.Errorf("Error={%s} happens when try to add api
%s", err.Error(), api.Path)
+ return
+ }
+ }
+}
+
+func (l *appServiceListener) getURLs(nmis nacosModel.Instance)
[]*dubboCommon.URL {
+ instance := toNacosInstance(nmis)
+ metadata := instance.GetMetadata()
+ metadataInfo, err :=
servicediscovery.GetMetadataInfo(instance.GetServiceName(), instance,
metadata[dubboConst.ExportedServicesRevisionPropertyName])
+ if err != nil {
+ logger.Errorf("get instance metadata info error %v",
err.Error())
+ return nil
+ }
+ instance.SetServiceMetadata(metadataInfo)
+ urls := make([]*dubboCommon.URL, 0, len(metadataInfo.Services))
+ for _, service := range metadataInfo.Services {
+ urls = append(urls, instance.ToURLs(service)...)
+ }
+ return urls
+}
+
+// toNacosInstance convert to registry's service instance
+func toNacosInstance(nmis nacosModel.Instance) dr.ServiceInstance {
+ md := make(map[string]string, len(nmis.Metadata))
+ for k, v := range nmis.Metadata {
+ md[k] = fmt.Sprint(v)
+ }
+ return &dr.DefaultServiceInstance{
+ ID: nmis.InstanceId,
+ ServiceName: nmis.ServiceName,
+ Host: nmis.Ip,
+ Port: int(nmis.Port),
+ Enable: nmis.Enable,
+ Healthy: nmis.Healthy,
+ Metadata: md,
+ }
+}
+
+// group@@serviceName convert to serviceName
+func handleServiceName(serviceName string) string {
+ parts := strings.Split(serviceName, "@@")
+ if len(parts) > 1 {
+ return parts[1]
+ }
+ return ""
+}
diff --git a/pkg/adapter/dubboregistry/registry/nacos/interface_listener.go
b/pkg/adapter/dubboregistry/registry/nacos/interface_listener.go
index 3588f33f..3536470f 100644
--- a/pkg/adapter/dubboregistry/registry/nacos/interface_listener.go
+++ b/pkg/adapter/dubboregistry/registry/nacos/interface_listener.go
@@ -71,27 +71,27 @@ func newNacosIntfListener(client
naming_client.INamingClient, reg *NacosRegistry
}
}
-func (z *nacosIntfListener) Close() {
- close(z.exit)
- z.wg.Wait()
+func (n *nacosIntfListener) Close() {
+ close(n.exit)
+ n.wg.Wait()
}
-func (z *nacosIntfListener) WatchAndHandle() {
- z.wg.Add(1)
- go z.watch()
+func (n *nacosIntfListener) WatchAndHandle() {
+ n.wg.Add(1)
+ go n.watch()
}
-func (z *nacosIntfListener) watch() {
- defer z.wg.Done()
+func (n *nacosIntfListener) watch() {
+ defer n.wg.Done()
var (
failTimes int64 = 0
delayTimer = time.NewTimer(ConnDelay *
time.Duration(failTimes))
)
defer delayTimer.Stop()
for {
- serviceList, err :=
z.client.GetAllServicesInfo(vo.GetAllServiceInfoParam{
- GroupName: z.regConf.Group,
- NameSpace: z.regConf.Namespace,
+ serviceList, err :=
n.client.GetAllServicesInfo(vo.GetAllServiceInfoParam{
+ GroupName: n.regConf.Group,
+ NameSpace: n.regConf.Namespace,
PageSize: 100,
})
// error handling
@@ -112,7 +112,7 @@ func (z *nacosIntfListener) watch() {
continue
}
failTimes = 0
- if err := z.updateServiceList(serviceList.Doms); err != nil {
+ if err := n.updateServiceList(serviceList.Doms); err != nil {
logger.Errorf("update service list failed %s", err)
}
time.Sleep(time.Second * 5)
@@ -142,10 +142,9 @@ func fromServiceFullKey(fullKey string) *serviceInfo {
}
}
-func (z *nacosIntfListener) updateServiceList(serviceList []string) error {
+func (n *nacosIntfListener) updateServiceList(serviceList []string) error {
// add new service info and watch
-
- newServiceMap := make(map[string]bool)
+ newServiceMap := make(map[string]struct{})
for _, v := range serviceList {
svcInfo := fromServiceFullKey(v)
@@ -154,45 +153,40 @@ func (z *nacosIntfListener) updateServiceList(serviceList
[]string) error {
continue
}
key := svcInfo.String()
- newServiceMap[key] = true
- if _, ok := z.serviceInfoMap[key]; !ok {
-
+ newServiceMap[key] = struct{}{}
+ if _, ok := n.serviceInfoMap[key]; !ok {
url, _ := dubboCommon.NewURL("mock://localhost:8848")
url.SetParam(constant.InterfaceKey,
svcInfo.interfaceName)
url.SetParam(constant.GroupKey, svcInfo.group)
url.SetParam(constant.VersionKey, svcInfo.version)
- l := newNacosSrvListener(url, z.client,
z.adapterListener)
+ l := newNacosSrvListener(url, n.client,
n.adapterListener)
l.wg.Add(1)
svcInfo.listener = l
- z.serviceInfoMap[key] = svcInfo
-
- go func(v *serviceInfo) {
- defer l.wg.Done()
+ n.serviceInfoMap[key] = svcInfo
- sub := &vo.SubscribeParam{
- ServiceName:
getSubscribeName(url),
- SubscribeCallback: l.Callback,
- GroupName: z.regConf.Group,
- }
+ sub := &vo.SubscribeParam{
+ ServiceName: getSubscribeName(url),
+ SubscribeCallback: l.Callback,
+ GroupName: n.regConf.Group,
+ }
- if err := z.client.Subscribe(sub); err != nil {
- logger.Errorf("subscribe listener with
interfaceKey = %s, error = %s", l, err)
- }
- }(svcInfo)
+ if err := n.client.Subscribe(sub); err != nil {
+ logger.Errorf("subscribe listener with
interfaceKey = %s, error = %s", l, err)
+ }
+ l.wg.Done()
}
}
// handle deleted service
- for k, v := range z.serviceInfoMap {
+ for k, v := range n.serviceInfoMap {
if _, ok := newServiceMap[k]; !ok {
- delete(z.serviceInfoMap, k)
+ delete(n.serviceInfoMap, k)
v.listener.Close()
}
}
return nil
-
}
func getSubscribeName(url *dubboCommon.URL) string {
diff --git a/pkg/adapter/dubboregistry/registry/nacos/registry.go
b/pkg/adapter/dubboregistry/registry/nacos/registry.go
index 72eb15f4..5b287e15 100644
--- a/pkg/adapter/dubboregistry/registry/nacos/registry.go
+++ b/pkg/adapter/dubboregistry/registry/nacos/registry.go
@@ -45,11 +45,11 @@ type NacosRegistry struct {
}
func (n *NacosRegistry) DoSubscribe() error {
- intfListener, ok := n.nacosListeners[n.RegisteredType]
+ Listener, ok := n.nacosListeners[n.RegisteredType]
if !ok {
return errors.New("Listener for interface level registration
does not initialized")
}
- go intfListener.WatchAndHandle()
+ go Listener.WatchAndHandle()
return nil
}
@@ -95,8 +95,8 @@ func newNacosRegistry(regConfig model.Registry,
adapterListener common.RegistryE
switch nacosRegistry.RegisteredType {
case registry.RegisteredTypeInterface:
nacosRegistry.nacosListeners[nacosRegistry.RegisteredType] =
newNacosIntfListener(client, nacosRegistry, ®Config, adapterListener)
- //case registry.RegisteredTypeApplication:
- //nacosRegistry.nacosListeners[nacosRegistry.RegisteredType] =
newZkAppListener(zkReg.client, zkReg, zkReg.AdapterListener)
+ case registry.RegisteredTypeApplication:
+ nacosRegistry.nacosListeners[nacosRegistry.RegisteredType] =
newNacosAppListener(client, nacosRegistry, ®Config, adapterListener)
default:
return nil, errors.Errorf("Unsupported registry type: %s",
regConfig.RegistryType)
}
diff --git a/pkg/adapter/dubboregistry/registry/nacos/service_listener.go
b/pkg/adapter/dubboregistry/registry/nacos/service_listener.go
index b24bef32..0fdb0490 100644
--- a/pkg/adapter/dubboregistry/registry/nacos/service_listener.go
+++ b/pkg/adapter/dubboregistry/registry/nacos/service_listener.go
@@ -130,7 +130,6 @@ func (z *serviceListener) Callback(services
[]nacosModel.SubscribeService, err e
}
func (z *serviceListener) handle(url *dubboCommon.URL, action
remoting.EventType) {
-
logger.Infof("update begin, service event: %v %v", action, url)
bkConfig, methods, location, err :=
registry.ParseDubboString(url.String())
@@ -167,6 +166,7 @@ func (z *serviceListener) handle(url *dubboCommon.URL,
action remoting.EventType
}
}
+// TODO: this function is useless for service listener
func (z *serviceListener) NotifyAll(e []*dubboRegistry.ServiceEvent, f func())
{
}
@@ -184,7 +184,7 @@ func generateURL(instance nacosModel.Instance)
*dubboCommon.URL {
path := instance.Metadata["path"]
myInterface := instance.Metadata["interface"]
if len(path) == 0 && len(myInterface) == 0 {
- logger.Errorf("nacos instance metadata does not have both path
key and interface key,instance:%+v", instance)
+ logger.Errorf("nacos instance metadata does not have both path
key and interface key,instance:%+v", instance)
return nil
}
if len(path) == 0 && len(myInterface) != 0 {
diff --git
a/pkg/adapter/dubboregistry/registry/zookeeper/application_listener.go
b/pkg/adapter/dubboregistry/registry/zookeeper/application_listener.go
index 910b2c53..09cca576 100644
--- a/pkg/adapter/dubboregistry/registry/zookeeper/application_listener.go
+++ b/pkg/adapter/dubboregistry/registry/zookeeper/application_listener.go
@@ -75,7 +75,6 @@ func (z *zkAppListener) WatchAndHandle() {
func (z *zkAppListener) watch() {
defer z.wg.Done()
-
var (
failTimes int64 = 0
delayTimer = time.NewTimer(ConnDelay *
time.Duration(failTimes))
diff --git
a/pkg/adapter/dubboregistry/registry/zookeeper/application_service_listener.go
b/pkg/adapter/dubboregistry/registry/zookeeper/application_service_listener.go
index 7f9cfb52..87024d3c 100644
---
a/pkg/adapter/dubboregistry/registry/zookeeper/application_service_listener.go
+++
b/pkg/adapter/dubboregistry/registry/zookeeper/application_service_listener.go
@@ -32,29 +32,29 @@ import (
dr "dubbo.apache.org/dubbo-go/v3/registry"
"dubbo.apache.org/dubbo-go/v3/registry/servicediscovery"
"dubbo.apache.org/dubbo-go/v3/remoting/zookeeper/curator_discovery"
+ "github.com/dubbo-go-pixiu/pixiu-api/pkg/api/config"
+ "github.com/dubbogo/go-zookeeper/zk"
+)
+import (
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/common"
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/registry"
"github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry/remoting/zookeeper"
"github.com/apache/dubbo-go-pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pkg/logger"
-
- "github.com/dubbo-go-pixiu/pixiu-api/pkg/api/config"
-
- "github.com/dubbogo/go-zookeeper/zk"
)
var _ registry.Listener = new(applicationServiceListener)
// applicationServiceListener normally monitors the /services/[:application]
type applicationServiceListener struct {
- urls []*dubboCommon.URL
- servicePath string
- client *zookeeper.ZooKeeperClient
- adapterListener common.RegistryEventListener
+ urls []*dubboCommon.URL
+ servicePath string
+ client *zookeeper.ZooKeeperClient
- exit chan struct{}
- wg sync.WaitGroup
+ exit chan struct{}
+ wg sync.WaitGroup
+ adapterListener common.RegistryEventListener
}
// newApplicationServiceListener creates a new zk service listener
@@ -69,7 +69,6 @@ func newApplicationServiceListener(path string, client
*zookeeper.ZooKeeperClien
func (asl *applicationServiceListener) WatchAndHandle() {
defer asl.wg.Done()
-
var (
failTimes int64 = 0
delayTimer = time.NewTimer(ConnDelay *
time.Duration(failTimes))
@@ -77,7 +76,6 @@ func (asl *applicationServiceListener) WatchAndHandle() {
defer delayTimer.Stop()
for {
children, e, err := asl.client.GetChildrenW(asl.servicePath)
- // error handling
if err != nil {
failTimes++
logger.Infof("watching (path{%s}) = error{%v}",
asl.servicePath, err)
diff --git a/pkg/adapter/dubboregistry/registry/zookeeper/interface_listener.go
b/pkg/adapter/dubboregistry/registry/zookeeper/interface_listener.go
index 78f16dfb..c2826d15 100644
--- a/pkg/adapter/dubboregistry/registry/zookeeper/interface_listener.go
+++ b/pkg/adapter/dubboregistry/registry/zookeeper/interface_listener.go
@@ -82,7 +82,6 @@ func (z *zkIntfListener) watch() {
defer delayTimer.Stop()
for {
_, e, err := z.client.GetChildrenW(z.path)
- // error handling
if err != nil {
failTimes++
logger.Infof("watching (path{%s}) = error{%v}", z.path,
err)
diff --git a/pkg/adapter/dubboregistry/registry/zookeeper/service_listener.go
b/pkg/adapter/dubboregistry/registry/zookeeper/service_listener.go
index c6697426..c49f9943 100644
--- a/pkg/adapter/dubboregistry/registry/zookeeper/service_listener.go
+++ b/pkg/adapter/dubboregistry/registry/zookeeper/service_listener.go
@@ -64,12 +64,10 @@ func newZkSrvListener(url *common.URL, path string, client
*zookeeper.ZooKeeperC
func (zkl *serviceListener) WatchAndHandle() {
defer zkl.wg.Done()
-
var (
failTimes int64 = 0
delayTimer = time.NewTimer(ConnDelay *
time.Duration(failTimes))
)
-
for {
children, e, err := zkl.client.GetChildrenW(zkl.path)
// error handling