This is an automated email from the ASF dual-hosted git repository. fangyc pushed a commit to branch 1.3 in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/1.3 by this push: new 7fcb34e add zk register code new 5f82281 Merge pull request #355 from pantianying/fix_zkproblemto1.3 7fcb34e is described below commit 7fcb34eb5612a5ad47c24ca4bc7aecc194f801e4 Author: pantianying <601666...@qq.com> AuthorDate: Sat Feb 8 12:34:14 2020 +0800 add zk register code --- config_center/zookeeper/impl.go | 63 +++--- config_center/zookeeper/impl_test.go | 2 +- go.mod | 2 +- go.sum | 4 +- registry/base_register.go | 375 +++++++++++++++++++++++++++++++++++ registry/etcdv3/listener.go | 10 +- registry/etcdv3/registry.go | 264 +++--------------------- registry/etcdv3/registry_test.go | 14 +- registry/registry.go | 12 +- registry/zookeeper/listener.go | 18 +- registry/zookeeper/registry.go | 354 ++++----------------------------- registry/zookeeper/registry_test.go | 4 +- remoting/etcdv3/client.go | 32 ++- remoting/etcdv3/facade.go | 7 +- remoting/etcdv3/listener.go | 9 +- remoting/listener.go | 6 + remoting/zookeeper/client.go | 128 ++++++++---- remoting/zookeeper/client_test.go | 11 +- remoting/zookeeper/facade.go | 7 +- remoting/zookeeper/facade_test.go | 4 +- remoting/zookeeper/listener.go | 75 +++---- remoting/zookeeper/listener_test.go | 5 +- 22 files changed, 689 insertions(+), 717 deletions(-) diff --git a/config_center/zookeeper/impl.go b/config_center/zookeeper/impl.go index 504d491..70fb196 100644 --- a/config_center/zookeeper/impl.go +++ b/config_center/zookeeper/impl.go @@ -24,8 +24,8 @@ import ( ) import ( + "github.com/dubbogo/go-zookeeper/zk" perrors "github.com/pkg/errors" - "github.com/samuel/go-zookeeper/zk" ) import ( @@ -37,7 +37,11 @@ import ( "github.com/apache/dubbo-go/remoting/zookeeper" ) -const ZkClient = "zk config_center" +const ( + // ZkClient + //zookeeper client name + ZkClient = "zk config_center" +) type zookeeperDynamicConfiguration struct { url *common.URL @@ -134,10 +138,9 @@ func (c *zookeeperDynamicConfiguration) GetProperties(key string, opts ...config content, _, err := c.client.GetContent(c.rootPath + "/" + key) if err != nil { return "", perrors.WithStack(err) - } else { - return string(content), nil } + return string(content), nil } //For zookeeper, getConfig and getConfigs have the same meaning. @@ -156,57 +159,57 @@ func (c *zookeeperDynamicConfiguration) SetParser(p parser.ConfigurationParser) c.parser = p } -func (r *zookeeperDynamicConfiguration) ZkClient() *zookeeper.ZookeeperClient { - return r.client +func (c *zookeeperDynamicConfiguration) ZkClient() *zookeeper.ZookeeperClient { + return c.client } -func (r *zookeeperDynamicConfiguration) SetZkClient(client *zookeeper.ZookeeperClient) { - r.client = client +func (c *zookeeperDynamicConfiguration) SetZkClient(client *zookeeper.ZookeeperClient) { + c.client = client } -func (r *zookeeperDynamicConfiguration) ZkClientLock() *sync.Mutex { - return &r.cltLock +func (c *zookeeperDynamicConfiguration) ZkClientLock() *sync.Mutex { + return &c.cltLock } -func (r *zookeeperDynamicConfiguration) WaitGroup() *sync.WaitGroup { - return &r.wg +func (c *zookeeperDynamicConfiguration) WaitGroup() *sync.WaitGroup { + return &c.wg } -func (r *zookeeperDynamicConfiguration) GetDone() chan struct{} { - return r.done +func (c *zookeeperDynamicConfiguration) Done() chan struct{} { + return c.done } -func (r *zookeeperDynamicConfiguration) GetUrl() common.URL { - return *r.url +func (c *zookeeperDynamicConfiguration) GetUrl() common.URL { + return *c.url } -func (r *zookeeperDynamicConfiguration) Destroy() { - if r.listener != nil { - r.listener.Close() +func (c *zookeeperDynamicConfiguration) Destroy() { + if c.listener != nil { + c.listener.Close() } - close(r.done) - r.wg.Wait() - r.closeConfigs() + close(c.done) + c.wg.Wait() + c.closeConfigs() } -func (r *zookeeperDynamicConfiguration) IsAvailable() bool { +func (c *zookeeperDynamicConfiguration) IsAvailable() bool { select { - case <-r.done: + case <-c.done: return false default: return true } } -func (r *zookeeperDynamicConfiguration) closeConfigs() { - r.cltLock.Lock() - defer r.cltLock.Unlock() +func (c *zookeeperDynamicConfiguration) closeConfigs() { + c.cltLock.Lock() + defer c.cltLock.Unlock() logger.Infof("begin to close provider zk client") // Close the old client first to close the tmp node - r.client.Close() - r.client = nil + c.client.Close() + c.client = nil } -func (r *zookeeperDynamicConfiguration) RestartCallBack() bool { +func (c *zookeeperDynamicConfiguration) RestartCallBack() bool { return true } diff --git a/config_center/zookeeper/impl_test.go b/config_center/zookeeper/impl_test.go index e614009..cca4427 100644 --- a/config_center/zookeeper/impl_test.go +++ b/config_center/zookeeper/impl_test.go @@ -24,7 +24,7 @@ import ( ) import ( - "github.com/samuel/go-zookeeper/zk" + "github.com/dubbogo/go-zookeeper/zk" "github.com/stretchr/testify/assert" ) diff --git a/go.mod b/go.mod index c89b397..f5ac8e5 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect github.com/creasty/defaults v1.3.0 github.com/dubbogo/getty v1.3.2 + github.com/dubbogo/go-zookeeper v1.0.0 github.com/dubbogo/gost v1.5.2 github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect github.com/go-errors/errors v1.0.1 // indirect @@ -37,7 +38,6 @@ require ( github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb github.com/pkg/errors v0.8.1 github.com/prometheus/client_golang v1.1.0 // indirect - github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec github.com/satori/go.uuid v1.2.0 github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945 // indirect github.com/soheilhy/cmux v0.1.4 // indirect diff --git a/go.sum b/go.sum index b23cb24..0f42e86 100644 --- a/go.sum +++ b/go.sum @@ -104,6 +104,8 @@ github.com/docker/go-units v0.3.3 h1:Xk8S3Xj5sLGlG5g67hJmYMmUgXv5N4PhkjJHHqrwnTk github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dubbogo/getty v1.3.2 h1:l1KVSs/1CtTKbIPTrkTtBT6S9ddvmswDGoAnnl2CDpM= github.com/dubbogo/getty v1.3.2/go.mod h1:ANbVQ9tbpZ2b0xdR8nRrgS/oXIsZAeRxzvPSOn/7mbk= +github.com/dubbogo/go-zookeeper v1.0.0 h1:RsYdlGwhDW+iKXM3eIIcvt34P2swLdmQfuIJxsHlGoM= +github.com/dubbogo/go-zookeeper v1.0.0/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c= github.com/dubbogo/gost v1.5.1 h1:oG5dzaWf1KYynBaBoUIOkgT+YD0niHV6xxI0Odq7hDg= github.com/dubbogo/gost v1.5.1/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8= github.com/dubbogo/gost v1.5.2 h1:ri/03971hdpnn3QeCU+4UZgnRNGDXLDGDucR/iozZm8= @@ -410,8 +412,6 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/ryanuber/go-glob v0.0.0-20170128012129-256dc444b735 h1:7YvPJVmEeFHR1Tj9sZEYsmarJEQfMVYpd/Vyy/A8dqE= github.com/ryanuber/go-glob v0.0.0-20170128012129-256dc444b735/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc= -github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec h1:6ncX5ko6B9LntYM0YBRXkiSaZMmLYeZ/NWcmeB43mMY= -github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I= diff --git a/registry/base_register.go b/registry/base_register.go new file mode 100644 index 0000000..5b9aef8 --- /dev/null +++ b/registry/base_register.go @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package registry + +import ( + "context" + "fmt" + "net/url" + "os" + "strconv" + "strings" + "sync" + "time" +) + +import ( + gxnet "github.com/dubbogo/gost/net" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" +) + +const ( + // RegistryConnDelay connection delay + RegistryConnDelay = 3 + // MaxWaitInterval max wait interval + MaxWaitInterval = 3 * time.Second +) + +var ( + processID = "" + localIP = "" +) + +func init() { + processID = fmt.Sprintf("%d", os.Getpid()) + localIP, _ = gxnet.GetLocalIP() +} + +/* + * -----------------------------------NOTICE--------------------------------------------- + * If there is no special case, you'd better inherit BaseRegistry and implement the + * FacadeBasedRegistry interface instead of directly implementing the Registry interface. + * -------------------------------------------------------------------------------------- + */ + +/* + * FacadeBasedRegistry interface is subclass of Registry, and it is designed for registry who want to inherit BaseRegistry. + * You have to implement the interface to inherit BaseRegistry. + */ +type FacadeBasedRegistry interface { + Registry + CreatePath(string) error + DoRegister(string, string) error + DoSubscribe(conf *common.URL) (Listener, error) + CloseAndNilClient() + CloseListener() + InitListeners() +} + +// BaseRegistry is a common logic abstract for registry. It implement Registry interface. +type BaseRegistry struct { + context context.Context + facadeBasedRegistry FacadeBasedRegistry + *common.URL + birth int64 // time of file birth, seconds since Epoch; 0 if unknown + wg sync.WaitGroup // wg+done for zk restart + done chan struct{} + cltLock sync.Mutex //ctl lock is a lock for services map + services map[string]common.URL // service name + protocol -> service config, for store the service registered +} + +// InitBaseRegistry for init some local variables and set BaseRegistry's subclass to it +func (r *BaseRegistry) InitBaseRegistry(url *common.URL, facadeRegistry FacadeBasedRegistry) Registry { + r.URL = url + r.birth = time.Now().UnixNano() + r.done = make(chan struct{}) + r.services = make(map[string]common.URL) + r.facadeBasedRegistry = facadeRegistry + return r +} + +// GetUrl for get registry's url +func (r *BaseRegistry) GetUrl() common.URL { + return *r.URL +} + +// Destroy for graceful down +func (r *BaseRegistry) Destroy() { + //first step close registry's all listeners + r.facadeBasedRegistry.CloseListener() + // then close r.done to notify other program who listen to it + close(r.done) + // wait waitgroup done (wait listeners outside close over) + r.wg.Wait() + //close registry client + r.closeRegisters() +} + +// Register implement interface registry to register +func (r *BaseRegistry) Register(conf common.URL) error { + var ( + ok bool + err error + ) + role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, "")) + // Check if the service has been registered + r.cltLock.Lock() + _, ok = r.services[conf.Key()] + r.cltLock.Unlock() + if ok { + return perrors.Errorf("Path{%s} has been registered", conf.Key()) + } + + err = r.register(conf) + if err != nil { + return perrors.WithMessagef(err, "register(conf:%+v)", conf) + } + + r.cltLock.Lock() + r.services[conf.Key()] = conf + r.cltLock.Unlock() + logger.Debugf("(%sRegistry)Register(conf{%#v})", common.DubboRole[role], conf) + + return nil +} + +// service is for getting service path stored in url +func (r *BaseRegistry) service(c common.URL) string { + return url.QueryEscape(c.Service()) +} + +// RestartCallBack for reregister when reconnect +func (r *BaseRegistry) RestartCallBack() bool { + + // copy r.services + services := []common.URL{} + for _, confIf := range r.services { + services = append(services, confIf) + } + + flag := true + for _, confIf := range services { + err := r.register(confIf) + if err != nil { + logger.Errorf("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}", + confIf, perrors.WithStack(err)) + flag = false + break + } + logger.Infof("success to re-register service :%v", confIf.Key()) + } + r.facadeBasedRegistry.InitListeners() + + return flag +} + +// register for register url to registry, include init params +func (r *BaseRegistry) register(c common.URL) error { + var ( + err error + //revision string + params url.Values + rawURL string + encodedURL string + dubboPath string + //conf config.URL + ) + params = url.Values{} + + c.RangeParams(func(key, value string) bool { + params.Add(key, value) + return true + }) + + params.Add("pid", processID) + params.Add("ip", localIP) + //params.Add("timeout", fmt.Sprintf("%d", int64(r.Timeout)/1e6)) + + role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, "")) + switch role { + + case common.PROVIDER: + dubboPath, rawURL, err = r.providerRegistry(c, params) + case common.CONSUMER: + dubboPath, rawURL, err = r.consumerRegistry(c, params) + default: + return perrors.Errorf("@c{%v} type is not referencer or provider", c) + } + encodedURL = url.QueryEscape(rawURL) + dubboPath = strings.ReplaceAll(dubboPath, "$", "%24") + err = r.facadeBasedRegistry.DoRegister(dubboPath, encodedURL) + + if err != nil { + return perrors.WithMessagef(err, "register Node(path:%s, url:%s)", dubboPath, rawURL) + } + return nil +} + +// providerRegistry for provider role do +func (r *BaseRegistry) providerRegistry(c common.URL, params url.Values) (string, string, error) { + var ( + dubboPath string + rawURL string + err error + ) + if c.Path == "" || len(c.Methods) == 0 { + return "", "", perrors.Errorf("conf{Path:%s, Methods:%s}", c.Path, c.Methods) + } + dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER]) + r.cltLock.Lock() + err = r.facadeBasedRegistry.CreatePath(dubboPath) + r.cltLock.Unlock() + if err != nil { + logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%#v}", dubboPath, perrors.WithStack(err)) + return "", "", perrors.WithMessagef(err, "facadeBasedRegistry.CreatePath(path:%s)", dubboPath) + } + params.Add("anyhost", "true") + + // Dubbo java consumer to start looking for the provider url,because the category does not match, + // the provider will not find, causing the consumer can not start, so we use consumers. + // DubboRole = [...]string{"consumer", "", "", "provider"} + // params.Add("category", (RoleType(PROVIDER)).Role()) + params.Add("category", (common.RoleType(common.PROVIDER)).String()) + params.Add("dubbo", "dubbo-provider-golang-"+constant.Version) + + params.Add("side", (common.RoleType(common.PROVIDER)).Role()) + + if len(c.Methods) == 0 { + params.Add("methods", strings.Join(c.Methods, ",")) + } + logger.Debugf("provider url params:%#v", params) + var host string + if c.Ip == "" { + host = localIP + ":" + c.Port + } else { + host = c.Ip + ":" + c.Port + } + + rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, params.Encode()) + // Print your own registration service providers. + dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), (common.RoleType(common.PROVIDER)).String()) + logger.Debugf("provider path:%s, url:%s", dubboPath, rawURL) + return dubboPath, rawURL, nil +} + +// consumerRegistry for consumer role do +func (r *BaseRegistry) consumerRegistry(c common.URL, params url.Values) (string, string, error) { + var ( + dubboPath string + rawURL string + err error + ) + dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.CONSUMER]) + r.cltLock.Lock() + err = r.facadeBasedRegistry.CreatePath(dubboPath) + r.cltLock.Unlock() + if err != nil { + logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err)) + return "", "", perrors.WithStack(err) + } + dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER]) + r.cltLock.Lock() + err = r.facadeBasedRegistry.CreatePath(dubboPath) + r.cltLock.Unlock() + if err != nil { + logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err)) + return "", "", perrors.WithStack(err) + } + + params.Add("protocol", c.Protocol) + params.Add("category", (common.RoleType(common.CONSUMER)).String()) + params.Add("dubbo", "dubbogo-consumer-"+constant.Version) + + rawURL = fmt.Sprintf("consumer://%s%s?%s", localIP, c.Path, params.Encode()) + dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), (common.RoleType(common.CONSUMER)).String()) + + logger.Debugf("consumer path:%s, url:%s", dubboPath, rawURL) + return dubboPath, rawURL, nil +} + +// sleepWait... +func sleepWait(n int) { + wait := time.Duration((n + 1) * 2e8) + if wait > MaxWaitInterval { + wait = MaxWaitInterval + } + time.Sleep(wait) +} + +// Subscribe :subscribe from registry, event will notify by notifyListener +func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) { + n := 0 + for { + n++ + if !r.IsAvailable() { + logger.Warnf("event listener game over.") + return + } + + listener, err := r.facadeBasedRegistry.DoSubscribe(url) + if err != nil { + if !r.IsAvailable() { + logger.Warnf("event listener game over.") + return + } + logger.Warnf("getListener() = err:%v", perrors.WithStack(err)) + time.Sleep(time.Duration(RegistryConnDelay) * time.Second) + continue + } + + for { + if serviceEvent, err := listener.Next(); err != nil { + logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err)) + listener.Close() + break + } else { + logger.Infof("update begin, service event: %v", serviceEvent.String()) + notifyListener.Notify(serviceEvent) + } + + } + sleepWait(n) + } +} + +// closeRegisters close and remove registry client and reset services map +func (r *BaseRegistry) closeRegisters() { + r.cltLock.Lock() + defer r.cltLock.Unlock() + logger.Infof("begin to close provider client") + // Close and remove(set to nil) the registry client + r.facadeBasedRegistry.CloseAndNilClient() + // reset the services map + r.services = nil +} + +// IsAvailable judge to is registry not closed by chan r.done +func (r *BaseRegistry) IsAvailable() bool { + select { + case <-r.done: + return false + default: + return true + } +} + +// WaitGroup open for outside add the waitgroup to add some logic before registry destroyed over(graceful down) +func (r *BaseRegistry) WaitGroup() *sync.WaitGroup { + return &r.wg +} + +// Done open for outside to listen the event of registry Destroy() called. +func (r *BaseRegistry) Done() chan struct{} { + return r.done +} diff --git a/registry/etcdv3/listener.go b/registry/etcdv3/listener.go index 31d62fa..5ed56f6 100644 --- a/registry/etcdv3/listener.go +++ b/registry/etcdv3/listener.go @@ -39,6 +39,7 @@ type dataListener struct { listener config_center.ConfigurationListener } +// NewRegistryDataListener ... func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener { return &dataListener{listener: listener, interestedURL: []*common.URL{}} } @@ -77,9 +78,10 @@ type configurationListener struct { events chan *config_center.ConfigChangeEvent } +// NewConfigurationListener for listening the event of etcdv3. func NewConfigurationListener(reg *etcdV3Registry) *configurationListener { // add a new waiter - reg.wg.Add(1) + reg.WaitGroup().Add(1) return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)} } func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) { @@ -89,7 +91,7 @@ func (l *configurationListener) Process(configType *config_center.ConfigChangeEv func (l *configurationListener) Next() (*registry.ServiceEvent, error) { for { select { - case <-l.registry.done: + case <-l.registry.Done(): logger.Warnf("listener's etcd client connection is broken, so etcd event listener exit now.") return nil, perrors.New("listener stopped") @@ -97,7 +99,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) { logger.Infof("got etcd event %#v", e) if e.ConfigType == remoting.EventTypeDel { select { - case <-l.registry.done: + case <-l.registry.Done(): logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value) default: } @@ -108,5 +110,5 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) { } } func (l *configurationListener) Close() { - l.registry.wg.Done() + l.registry.WaitGroup().Done() } diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go index b058113..e1c2576 100644 --- a/registry/etcdv3/registry.go +++ b/registry/etcdv3/registry.go @@ -19,17 +19,13 @@ package etcdv3 import ( "fmt" - "net/url" - "os" "path" - "strconv" "strings" "sync" "time" ) import ( - gxnet "github.com/dubbogo/gost/net" perrors "github.com/pkg/errors" ) @@ -42,74 +38,39 @@ import ( "github.com/apache/dubbo-go/remoting/etcdv3" ) -var ( - processID = "" - localIP = "" -) - const ( - Name = "etcdv3" - RegistryConnDelay = 3 + // Name module name + Name = "etcdv3" ) func init() { - processID = fmt.Sprintf("%d", os.Getpid()) - localIP, _ = gxnet.GetLocalIP() extension.SetRegistry(Name, newETCDV3Registry) } type etcdV3Registry struct { - *common.URL - birth int64 // time of file birth, seconds since Epoch; 0 if unknown - - cltLock sync.Mutex - client *etcdv3.Client - services map[string]common.URL // service name + protocol -> service config - + registry.BaseRegistry + cltLock sync.Mutex + client *etcdv3.Client listenerLock sync.Mutex listener *etcdv3.EventListener dataListener *dataListener configListener *configurationListener - - wg sync.WaitGroup // wg+done for etcd client restart - done chan struct{} } +// Client get the etcdv3 client func (r *etcdV3Registry) Client() *etcdv3.Client { return r.client } + +//SetClient set the etcdv3 client func (r *etcdV3Registry) SetClient(client *etcdv3.Client) { r.client = client } + +// func (r *etcdV3Registry) ClientLock() *sync.Mutex { return &r.cltLock } -func (r *etcdV3Registry) WaitGroup() *sync.WaitGroup { - return &r.wg -} -func (r *etcdV3Registry) GetDone() chan struct{} { - return r.done -} -func (r *etcdV3Registry) RestartCallBack() bool { - - services := []common.URL{} - for _, confIf := range r.services { - services = append(services, confIf) - } - - flag := true - for _, confIf := range services { - err := r.Register(confIf) - if err != nil { - logger.Errorf("(etcdV3ProviderRegistry)register(conf{%#v}) = error{%#v}", - confIf, perrors.WithStack(err)) - flag = false - break - } - logger.Infof("success to re-register service :%v", confIf.Key()) - } - return flag -} func newETCDV3Registry(url *common.URL) (registry.Registry, error) { @@ -122,12 +83,9 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) { logger.Infof("etcd address is: %v, timeout is: %s", url.Location, timeout.String()) - r := &etcdV3Registry{ - URL: url, - birth: time.Now().UnixNano(), - done: make(chan struct{}), - services: make(map[string]common.URL), - } + r := &etcdV3Registry{} + + r.InitBaseRegistry(url, r) if err := etcdv3.ValidateClient( r, @@ -137,89 +95,37 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) { ); err != nil { return nil, err } + r.WaitGroup().Add(1) //etcdv3 client start successful, then wg +1 - r.wg.Add(1) go etcdv3.HandleClientRestart(r) - r.listener = etcdv3.NewEventListener(r.client) - r.configListener = NewConfigurationListener(r) - r.dataListener = NewRegistryDataListener(r.configListener) + r.InitListeners() return r, nil } -func (r *etcdV3Registry) GetUrl() common.URL { - return *r.URL -} - -func (r *etcdV3Registry) IsAvailable() bool { - - select { - case <-r.done: - return false - default: - return true - } +func (r *etcdV3Registry) InitListeners() { + r.listener = etcdv3.NewEventListener(r.client) + r.configListener = NewConfigurationListener(r) + r.dataListener = NewRegistryDataListener(r.configListener) } -func (r *etcdV3Registry) Destroy() { - - if r.configListener != nil { - r.configListener.Close() - } - r.stop() +func (r *etcdV3Registry) DoRegister(root string, node string) error { + return r.client.Create(path.Join(root, node), "") } -func (r *etcdV3Registry) stop() { - - close(r.done) - - // close current client +func (r *etcdV3Registry) CloseAndNilClient() { r.client.Close() - - r.cltLock.Lock() r.client = nil - r.services = nil - r.cltLock.Unlock() } -func (r *etcdV3Registry) Register(svc common.URL) error { - - role, err := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, "")) - if err != nil { - return perrors.WithMessage(err, "get registry role") - } - - r.cltLock.Lock() - if _, ok := r.services[svc.Key()]; ok { - r.cltLock.Unlock() - return perrors.New(fmt.Sprintf("Path{%s} has been registered", svc.Path)) - } - r.cltLock.Unlock() - - switch role { - case common.PROVIDER: - logger.Debugf("(provider register )Register(conf{%#v})", svc) - if err := r.registerProvider(svc); err != nil { - return perrors.WithMessage(err, "register provider") - } - case common.CONSUMER: - logger.Debugf("(consumer register )Register(conf{%#v})", svc) - if err := r.registerConsumer(svc); err != nil { - return perrors.WithMessage(err, "register consumer") - } - default: - return perrors.New(fmt.Sprintf("unknown role %d", role)) +func (r *etcdV3Registry) CloseListener() { + if r.configListener != nil { + r.configListener.Close() } - - r.cltLock.Lock() - r.services[svc.Key()] = svc - r.cltLock.Unlock() - return nil } -func (r *etcdV3Registry) createDirIfNotExist(k string) error { - +func (r *etcdV3Registry) CreatePath(k string) error { var tmpPath string for _, str := range strings.Split(k, "/")[1:] { tmpPath = path.Join(tmpPath, "/", str) @@ -231,89 +137,7 @@ func (r *etcdV3Registry) createDirIfNotExist(k string) error { return nil } -func (r *etcdV3Registry) registerConsumer(svc common.URL) error { - - consumersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.CONSUMER]) - if err := r.createDirIfNotExist(consumersNode); err != nil { - logger.Errorf("etcd client create path %s: %v", consumersNode, err) - return perrors.WithMessage(err, "etcd create consumer nodes") - } - providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.PROVIDER]) - if err := r.createDirIfNotExist(providersNode); err != nil { - return perrors.WithMessage(err, "create provider node") - } - - params := url.Values{} - - params.Add("protocol", svc.Protocol) - - params.Add("category", (common.RoleType(common.CONSUMER)).String()) - params.Add("dubbo", "dubbogo-consumer-"+constant.Version) - - encodedURL := url.QueryEscape(fmt.Sprintf("consumer://%s%s?%s", localIP, svc.Path, params.Encode())) - dubboPath := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.CONSUMER)).String()) - if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil { - return perrors.WithMessagef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL) - } - - return nil -} - -func (r *etcdV3Registry) registerProvider(svc common.URL) error { - - if len(svc.Path) == 0 || len(svc.Methods) == 0 { - return perrors.New(fmt.Sprintf("service path %s or service method %s", svc.Path, svc.Methods)) - } - - var ( - urlPath string - encodedURL string - dubboPath string - ) - - providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.PROVIDER]) - if err := r.createDirIfNotExist(providersNode); err != nil { - return perrors.WithMessage(err, "create provider node") - } - - params := url.Values{} - - svc.RangeParams(func(key, value string) bool { - params[key] = []string{value} - return true - }) - params.Add("pid", processID) - params.Add("ip", localIP) - params.Add("anyhost", "true") - params.Add("category", (common.RoleType(common.PROVIDER)).String()) - params.Add("dubbo", "dubbo-provider-golang-"+constant.Version) - params.Add("side", (common.RoleType(common.PROVIDER)).Role()) - - if len(svc.Methods) == 0 { - params.Add("methods", strings.Join(svc.Methods, ",")) - } - - logger.Debugf("provider url params:%#v", params) - var host string - if len(svc.Ip) == 0 { - host = localIP + ":" + svc.Port - } else { - host = svc.Ip + ":" + svc.Port - } - - urlPath = svc.Path - - encodedURL = url.QueryEscape(fmt.Sprintf("%s://%s%s?%s", svc.Protocol, host, urlPath, params.Encode())) - dubboPath = fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.PROVIDER)).String()) - - if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil { - return perrors.WithMessagef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL) - } - - return nil -} - -func (r *etcdV3Registry) subscribe(svc *common.URL) (registry.Listener, error) { +func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error) { var ( configListener *configurationListener @@ -346,37 +170,3 @@ func (r *etcdV3Registry) subscribe(svc *common.URL) (registry.Listener, error) { return configListener, nil } - -//subscribe from registry -func (r *etcdV3Registry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) { - for { - if !r.IsAvailable() { - logger.Warnf("event listener game over.") - return - } - - listener, err := r.subscribe(url) - if err != nil { - if !r.IsAvailable() { - logger.Warnf("event listener game over.") - return - } - logger.Warnf("getListener() = err:%v", perrors.WithStack(err)) - time.Sleep(time.Duration(RegistryConnDelay) * time.Second) - continue - } - - for { - if serviceEvent, err := listener.Next(); err != nil { - logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err)) - listener.Close() - return - } else { - logger.Infof("update begin, service event: %v", serviceEvent.String()) - notifyListener.Notify(serviceEvent) - } - - } - - } -} diff --git a/registry/etcdv3/registry_test.go b/registry/etcdv3/registry_test.go index 6c05a8a..87cf240 100644 --- a/registry/etcdv3/registry_test.go +++ b/registry/etcdv3/registry_test.go @@ -46,7 +46,8 @@ func initRegistry(t *testing.T) *etcdV3Registry { } out := reg.(*etcdV3Registry) - out.client.CleanKV() + err = out.client.CleanKV() + assert.NoError(t, err) return out } @@ -58,6 +59,7 @@ func (suite *RegistryTestSuite) TestRegister() { reg := initRegistry(t) err := reg.Register(url) + assert.NoError(t, err) children, _, err := reg.client.GetChildrenKVList("/dubbo/com.ikurento.user.UserProvider/providers") if err != nil { t.Fatal(err) @@ -83,8 +85,9 @@ func (suite *RegistryTestSuite) TestSubscribe() { regurl.SetParam(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) reg2 := initRegistry(t) - reg2.Register(url) - listener, err := reg2.subscribe(&url) + err = reg2.Register(url) + assert.NoError(t, err) + listener, err := reg2.DoSubscribe(&url) if err != nil { t.Fatal(err) } @@ -102,7 +105,7 @@ func (suite *RegistryTestSuite) TestConsumerDestory() { url, _ := common.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) reg := initRegistry(t) - _, err := reg.subscribe(&url) + _, err := reg.DoSubscribe(&url) if err != nil { t.Fatal(err) } @@ -120,7 +123,8 @@ func (suite *RegistryTestSuite) TestProviderDestory() { t := suite.T() reg := initRegistry(t) url, _ := common.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) - reg.Register(url) + err := reg.Register(url) + assert.NoError(t, err) //listener.Close() time.Sleep(1e9) diff --git a/registry/registry.go b/registry/registry.go index c7279a2..d673864 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -21,7 +21,13 @@ import ( "github.com/apache/dubbo-go/common" ) -// Extension - Registry +/* + * -----------------------------------NOTICE--------------------------------------------- + * If there is no special case, you'd better inherit BaseRegistry and implement the + * FacadeBasedRegistry interface instead of directly implementing the Registry interface. + * -------------------------------------------------------------------------------------- + */ +// Registry Extension - Registry type Registry interface { common.Node //used for service provider calling , register services to registry @@ -38,11 +44,13 @@ type Registry interface { //mode2 : callback mode, subscribe with notify(notify listener). Subscribe(*common.URL, NotifyListener) } + +// NotifyListener ... type NotifyListener interface { Notify(*ServiceEvent) } -//Deprecated! +// Listener Deprecated! type Listener interface { Next() (*ServiceEvent, error) Close() diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go index 53a5926..e895243 100644 --- a/registry/zookeeper/listener.go +++ b/registry/zookeeper/listener.go @@ -36,18 +36,23 @@ import ( zk "github.com/apache/dubbo-go/remoting/zookeeper" ) +// RegistryDataListener ... type RegistryDataListener struct { interestedURL []*common.URL listener config_center.ConfigurationListener } +// NewRegistryDataListener ... func NewRegistryDataListener(listener config_center.ConfigurationListener) *RegistryDataListener { return &RegistryDataListener{listener: listener, interestedURL: []*common.URL{}} } + +// AddInterestedURL ... func (l *RegistryDataListener) AddInterestedURL(url *common.URL) { l.interestedURL = append(l.interestedURL, url) } +// DataChange ... func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool { // Intercept the last bit index := strings.Index(eventType.Path, "/providers/") @@ -71,6 +76,7 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool { return false } +// RegistryConfigurationListener ... type RegistryConfigurationListener struct { client *zk.ZookeeperClient registry *zkRegistry @@ -79,14 +85,18 @@ type RegistryConfigurationListener struct { closeOnce sync.Once } +// NewRegistryConfigurationListener for listening the event of zk. func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry) *RegistryConfigurationListener { - reg.wg.Add(1) + reg.WaitGroup().Add(1) return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32), isClosed: false} } + +// Process ... func (l *RegistryConfigurationListener) Process(configType *config_center.ConfigChangeEvent) { l.events <- configType } +// Next ... func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) { for { select { @@ -94,7 +104,7 @@ func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) { logger.Warnf("listener's zk client connection is broken, so zk event listener exit now.") return nil, perrors.New("listener stopped") - case <-l.registry.done: + case <-l.registry.Done(): logger.Warnf("zk consumer register has quit, so zk event listener exit now.") return nil, perrors.New("listener stopped") @@ -111,11 +121,13 @@ func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) { } } } + +// Close ... func (l *RegistryConfigurationListener) Close() { // ensure that the listener will be closed at most once. l.closeOnce.Do(func() { l.isClosed = true - l.registry.wg.Done() + l.registry.WaitGroup().Done() }) } diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 24c4158..f4e53dc 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -18,20 +18,16 @@ package zookeeper import ( - "context" "fmt" "net/url" - "os" - "strconv" "strings" "sync" "time" ) import ( - gxnet "github.com/dubbogo/gost/net" + "github.com/dubbogo/go-zookeeper/zk" perrors "github.com/pkg/errors" - "github.com/samuel/go-zookeeper/zk" ) import ( @@ -44,20 +40,11 @@ import ( ) const ( - RegistryZkClient = "zk registry" - RegistryConnDelay = 3 - MaxWaitInterval = time.Duration(3e9) -) - -var ( - processID = "" - localIP = "" + // RegistryZkClient zk client name + RegistryZkClient = "zk registry" ) func init() { - processID = fmt.Sprintf("%d", os.Getpid()) - localIP, _ = gxnet.GetLocalIP() - //plugins.PluggableRegistries["zookeeper"] = newZkRegistry extension.SetRegistry("zookeeper", newZkRegistry) } @@ -66,20 +53,13 @@ func init() { ///////////////////////////////////// type zkRegistry struct { - context context.Context - *common.URL - birth int64 // time of file birth, seconds since Epoch; 0 if unknown - wg sync.WaitGroup // wg+done for zk restart - done chan struct{} - - cltLock sync.Mutex - client *zookeeper.ZookeeperClient - services map[string]common.URL // service name + protocol -> service config - + registry.BaseRegistry + client *zookeeper.ZookeeperClient listenerLock sync.Mutex listener *zookeeper.ZkEventListener dataListener *RegistryDataListener configListener *RegistryConfigurationListener + cltLock sync.Mutex //for provider zkPath map[string]int // key = protocol://ip:port/interface } @@ -89,21 +69,17 @@ func newZkRegistry(url *common.URL) (registry.Registry, error) { err error r *zkRegistry ) - r = &zkRegistry{ - URL: url, - birth: time.Now().UnixNano(), - done: make(chan struct{}), - services: make(map[string]common.URL), - zkPath: make(map[string]int), + zkPath: make(map[string]int), } + r.InitBaseRegistry(url, r) err = zookeeper.ValidateZookeeperClient(r, zookeeper.WithZkName(RegistryZkClient)) if err != nil { return nil, err } + r.WaitGroup().Add(1) //zk client start successful, then wg +1 - r.wg.Add(1) go zookeeper.HandleClientRestart(r) r.listener = zookeeper.NewZkEventListener(r.client) @@ -113,10 +89,12 @@ func newZkRegistry(url *common.URL) (registry.Registry, error) { return r, nil } +// Options ... type Options struct { client *zookeeper.ZookeeperClient } +// Option ... type Option func(*Options) func newMockZkRegistry(url *common.URL, opts ...zookeeper.Option) (*zk.TestCluster, *zkRegistry, error) { @@ -128,27 +106,41 @@ func newMockZkRegistry(url *common.URL, opts ...zookeeper.Option) (*zk.TestClust ) r = &zkRegistry{ - URL: url, - birth: time.Now().UnixNano(), - done: make(chan struct{}), - services: make(map[string]common.URL), - zkPath: make(map[string]int), + zkPath: make(map[string]int), } - + r.InitBaseRegistry(url, r) c, r.client, _, err = zookeeper.NewMockZookeeperClient("test", 15*time.Second, opts...) if err != nil { return nil, nil, err } - r.wg.Add(1) + r.WaitGroup().Add(1) //zk client start successful, then wg +1 go zookeeper.HandleClientRestart(r) + r.InitListeners() + return c, r, nil +} +func (r *zkRegistry) InitListeners() { r.listener = zookeeper.NewZkEventListener(r.client) r.configListener = NewRegistryConfigurationListener(r.client, r) r.dataListener = NewRegistryDataListener(r.configListener) +} - return c, r, nil +func (r *zkRegistry) CreatePath(path string) error { + return r.ZkClient().Create(path) +} + +func (r *zkRegistry) DoRegister(root string, node string) error { + return r.registerTempZookeeperNode(root, node) +} + +func (r *zkRegistry) DoSubscribe(conf *common.URL) (registry.Listener, error) { + return r.getListener(conf) } +func (r *zkRegistry) CloseAndNilClient() { + r.client.Close() + r.client = nil +} func (r *zkRegistry) ZkClient() *zookeeper.ZookeeperClient { return r.client } @@ -161,222 +153,10 @@ func (r *zkRegistry) ZkClientLock() *sync.Mutex { return &r.cltLock } -func (r *zkRegistry) WaitGroup() *sync.WaitGroup { - return &r.wg -} - -func (r *zkRegistry) GetDone() chan struct{} { - return r.done -} - -func (r *zkRegistry) GetUrl() common.URL { - return *r.URL -} - -func (r *zkRegistry) Destroy() { +func (r *zkRegistry) CloseListener() { if r.configListener != nil { r.configListener.Close() } - close(r.done) - r.wg.Wait() - r.closeRegisters() -} - -func (r *zkRegistry) RestartCallBack() bool { - - // copy r.services - services := []common.URL{} - for _, confIf := range r.services { - services = append(services, confIf) - } - - flag := true - for _, confIf := range services { - err := r.register(confIf) - if err != nil { - logger.Errorf("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}", - confIf, perrors.WithStack(err)) - flag = false - break - } - logger.Infof("success to re-register service :%v", confIf.Key()) - } - r.listener = zookeeper.NewZkEventListener(r.client) - r.configListener = NewRegistryConfigurationListener(r.client, r) - r.dataListener = NewRegistryDataListener(r.configListener) - - return flag -} - -func (r *zkRegistry) Register(conf common.URL) error { - var ( - ok bool - err error - ) - role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, "")) - switch role { - case common.CONSUMER: - r.cltLock.Lock() - _, ok = r.services[conf.Key()] - r.cltLock.Unlock() - if ok { - return perrors.Errorf("Path{%s} has been registered", conf.Path) - } - - err = r.register(conf) - if err != nil { - return perrors.WithStack(err) - } - - r.cltLock.Lock() - r.services[conf.Key()] = conf - r.cltLock.Unlock() - logger.Debugf("(consumerZkConsumerRegistry)Register(conf{%#v})", conf) - - case common.PROVIDER: - - // Check if the service has been registered - r.cltLock.Lock() - // Note the difference between consumer and consumerZookeeperRegistry (consumer use conf.Path). - // Because the consumer wants to provide monitoring functions for the selector, - // the provider allows multiple groups or versions of the same service to be registered. - _, ok = r.services[conf.Key()] - r.cltLock.Unlock() - if ok { - return perrors.Errorf("Path{%s} has been registered", conf.Key()) - } - - err = r.register(conf) - if err != nil { - return perrors.WithMessagef(err, "register(conf:%+v)", conf) - } - - r.cltLock.Lock() - r.services[conf.Key()] = conf - r.cltLock.Unlock() - - logger.Debugf("(ZkProviderRegistry)Register(conf{%#v})", conf) - } - - return nil -} - -func (r *zkRegistry) service(c common.URL) string { - return url.QueryEscape(c.Service()) -} - -func (r *zkRegistry) register(c common.URL) error { - var ( - err error - //revision string - params url.Values - rawURL string - encodedURL string - dubboPath string - //conf config.URL - ) - - err = zookeeper.ValidateZookeeperClient(r, zookeeper.WithZkName(RegistryZkClient)) - if err != nil { - return perrors.WithStack(err) - } - params = url.Values{} - - c.RangeParams(func(key, value string) bool { - params.Add(key, value) - return true - }) - - params.Add("pid", processID) - params.Add("ip", localIP) - //params.Add("timeout", fmt.Sprintf("%d", int64(r.Timeout)/1e6)) - - role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, "")) - switch role { - - case common.PROVIDER: - - if c.Path == "" || len(c.Methods) == 0 { - return perrors.Errorf("conf{Path:%s, Methods:%s}", c.Path, c.Methods) - } - // 先创建服务下面的provider node - dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER]) - r.cltLock.Lock() - err = r.client.Create(dubboPath) - r.cltLock.Unlock() - if err != nil { - logger.Errorf("zkClient.create(path{%s}) = error{%#v}", dubboPath, perrors.WithStack(err)) - return perrors.WithMessagef(err, "zkclient.Create(path:%s)", dubboPath) - } - params.Add("anyhost", "true") - - // Dubbo java consumer to start looking for the provider url,because the category does not match, - // the provider will not find, causing the consumer can not start, so we use consumers. - // DubboRole = [...]string{"consumer", "", "", "provider"} - // params.Add("category", (RoleType(PROVIDER)).Role()) - params.Add("category", (common.RoleType(common.PROVIDER)).String()) - params.Add("dubbo", "dubbo-provider-golang-"+constant.Version) - - params.Add("side", (common.RoleType(common.PROVIDER)).Role()) - - if len(c.Methods) == 0 { - params.Add("methods", strings.Join(c.Methods, ",")) - } - logger.Debugf("provider zk url params:%#v", params) - var host string - if c.Ip == "" { - host = localIP + ":" + c.Port - } else { - host = c.Ip + ":" + c.Port - } - - rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, params.Encode()) - encodedURL = url.QueryEscape(rawURL) - - // Print your own registration service providers. - dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), (common.RoleType(common.PROVIDER)).String()) - logger.Debugf("provider path:%s, url:%s", dubboPath, rawURL) - - case common.CONSUMER: - dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.CONSUMER]) - r.cltLock.Lock() - err = r.client.Create(dubboPath) - r.cltLock.Unlock() - if err != nil { - logger.Errorf("zkClient.create(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err)) - return perrors.WithStack(err) - } - dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER]) - r.cltLock.Lock() - err = r.client.Create(dubboPath) - r.cltLock.Unlock() - if err != nil { - logger.Errorf("zkClient.create(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err)) - return perrors.WithStack(err) - } - - params.Add("protocol", c.Protocol) - - params.Add("category", (common.RoleType(common.CONSUMER)).String()) - params.Add("dubbo", "dubbogo-consumer-"+constant.Version) - - rawURL = fmt.Sprintf("consumer://%s%s?%s", localIP, c.Path, params.Encode()) - encodedURL = url.QueryEscape(rawURL) - - dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), (common.RoleType(common.CONSUMER)).String()) - logger.Debugf("consumer path:%s, url:%s", dubboPath, rawURL) - - default: - return perrors.Errorf("@c{%v} type is not referencer or provider", c) - } - - dubboPath = strings.ReplaceAll(dubboPath, "$", "%24") - err = r.registerTempZookeeperNode(dubboPath, encodedURL) - - if err != nil { - return perrors.WithMessagef(err, "registerTempZookeeperNode(path:%s, url:%s)", dubboPath, rawURL) - } - return nil } func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error { @@ -406,53 +186,6 @@ func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error { return nil } -func (r *zkRegistry) subscribe(conf *common.URL) (registry.Listener, error) { - return r.getListener(conf) -} -func sleepWait(n int) { - wait := time.Duration((n + 1) * 2e8) - if wait > MaxWaitInterval { - wait = MaxWaitInterval - } - time.Sleep(wait) -} - -//subscribe from registry -func (r *zkRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) { - n := 0 - for { - n++ - if !r.IsAvailable() { - logger.Warnf("event listener game over.") - return - } - - listener, err := r.subscribe(url) - if err != nil { - if !r.IsAvailable() { - logger.Warnf("event listener game over.") - return - } - logger.Warnf("getListener() = err:%v", perrors.WithStack(err)) - time.Sleep(time.Duration(RegistryConnDelay) * time.Second) - continue - } - - for { - if serviceEvent, err := listener.Next(); err != nil { - logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err)) - listener.Close() - break - } else { - logger.Infof("update begin, service event: %v", serviceEvent.String()) - notifyListener.Notify(serviceEvent) - } - - } - sleepWait(n) - } -} - func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListener, error) { var ( zkListener *RegistryConfigurationListener @@ -489,22 +222,3 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen return zkListener, nil } - -func (r *zkRegistry) closeRegisters() { - r.cltLock.Lock() - defer r.cltLock.Unlock() - logger.Infof("begin to close provider zk client") - // Close the old client first to close the tmp node. - r.client.Close() - r.client = nil - r.services = nil -} - -func (r *zkRegistry) IsAvailable() bool { - select { - case <-r.done: - return false - default: - return true - } -} diff --git a/registry/zookeeper/registry_test.go b/registry/zookeeper/registry_test.go index 2c7bb90..5e5189c 100644 --- a/registry/zookeeper/registry_test.go +++ b/registry/zookeeper/registry_test.go @@ -64,7 +64,7 @@ func Test_Subscribe(t *testing.T) { _, reg2, _ := newMockZkRegistry(®url, zookeeper.WithTestCluster(ts)) reg2.Register(url) - listener, _ := reg2.subscribe(&url) + listener, _ := reg2.DoSubscribe(&url) serviceEvent, _ := listener.Next() assert.NoError(t, err) @@ -85,7 +85,7 @@ func Test_ConsumerDestory(t *testing.T) { assert.NoError(t, err) err = reg.Register(url) assert.NoError(t, err) - _, err = reg.subscribe(&url) + _, err = reg.DoSubscribe(&url) assert.NoError(t, err) //listener.Close() diff --git a/remoting/etcdv3/client.go b/remoting/etcdv3/client.go index 0509685..ba3ea6e 100644 --- a/remoting/etcdv3/client.go +++ b/remoting/etcdv3/client.go @@ -36,16 +36,22 @@ import ( ) const ( - ConnDelay = 3 - MaxFailTimes = 15 + // ConnDelay connection dalay + ConnDelay = 3 + // MaxFailTimes max failure times + MaxFailTimes = 15 + // RegistryETCDV3Client client name RegistryETCDV3Client = "etcd registry" ) var ( + // ErrNilETCDV3Client ... ErrNilETCDV3Client = perrors.New("etcd raw client is nil") // full describe the ERR - ErrKVPairNotFound = perrors.New("k/v pair not found") + // ErrKVPairNotFound ... + ErrKVPairNotFound = perrors.New("k/v pair not found") ) +// Options ... type Options struct { name string endpoints []string @@ -54,30 +60,38 @@ type Options struct { heartbeat int // heartbeat second } +// Option ... type Option func(*Options) +// WithEndpoints ... func WithEndpoints(endpoints ...string) Option { return func(opt *Options) { opt.endpoints = endpoints } } + +// WithName ... func WithName(name string) Option { return func(opt *Options) { opt.name = name } } + +// WithTimeout ... func WithTimeout(timeout time.Duration) Option { return func(opt *Options) { opt.timeout = timeout } } +// WithHeartbeat ... func WithHeartbeat(heartbeat int) Option { return func(opt *Options) { opt.heartbeat = heartbeat } } +// ValidateClient ... func ValidateClient(container clientFacade, opts ...Option) error { options := &Options{ @@ -117,6 +131,7 @@ func ValidateClient(container clientFacade, opts ...Option) error { return nil } +// Client ... type Client struct { lock sync.RWMutex @@ -191,6 +206,7 @@ func (c *Client) stop() bool { return false } +// Close ... func (c *Client) Close() { if c == nil { @@ -309,6 +325,7 @@ func (c *Client) get(k string) (string, error) { return string(resp.Kvs[0].Value), nil } +// CleanKV ... func (c *Client) CleanKV() error { c.lock.RLock() @@ -408,10 +425,12 @@ func (c *Client) keepAliveKV(k string, v string) error { return nil } +// Done ... func (c *Client) Done() <-chan struct{} { return c.exit } +// Valid ... func (c *Client) Valid() bool { select { case <-c.exit: @@ -428,6 +447,7 @@ func (c *Client) Valid() bool { return true } +// Create ... func (c *Client) Create(k string, v string) error { err := c.put(k, v) @@ -437,6 +457,7 @@ func (c *Client) Create(k string, v string) error { return nil } +// Delete ... func (c *Client) Delete(k string) error { err := c.delete(k) @@ -447,6 +468,7 @@ func (c *Client) Delete(k string) error { return nil } +// RegisterTemp ... func (c *Client) RegisterTemp(basePath string, node string) (string, error) { completeKey := path.Join(basePath, node) @@ -459,6 +481,7 @@ func (c *Client) RegisterTemp(basePath string, node string) (string, error) { return completeKey, nil } +// GetChildrenKVList ... func (c *Client) GetChildrenKVList(k string) ([]string, []string, error) { kList, vList, err := c.getChildren(k) @@ -468,6 +491,7 @@ func (c *Client) GetChildrenKVList(k string) ([]string, []string, error) { return kList, vList, nil } +// Get ... func (c *Client) Get(k string) (string, error) { v, err := c.get(k) @@ -478,6 +502,7 @@ func (c *Client) Get(k string) (string, error) { return v, nil } +// Watch ... func (c *Client) Watch(k string) (clientv3.WatchChan, error) { wc, err := c.watch(k) @@ -487,6 +512,7 @@ func (c *Client) Watch(k string) (clientv3.WatchChan, error) { return wc, nil } +// WatchWithPrefix ... func (c *Client) WatchWithPrefix(prefix string) (clientv3.WatchChan, error) { wc, err := c.watchWithPrefix(prefix) diff --git a/remoting/etcdv3/facade.go b/remoting/etcdv3/facade.go index 499044b..35befc8 100644 --- a/remoting/etcdv3/facade.go +++ b/remoting/etcdv3/facade.go @@ -38,11 +38,12 @@ type clientFacade interface { SetClient(*Client) ClientLock() *sync.Mutex WaitGroup() *sync.WaitGroup //for wait group control, etcd client listener & etcd client container - GetDone() chan struct{} //for etcd client control + Done() chan struct{} //for etcd client control RestartCallBack() bool common.Node } +// HandleClientRestart ... func HandleClientRestart(r clientFacade) { var ( @@ -54,7 +55,7 @@ func HandleClientRestart(r clientFacade) { LOOP: for { select { - case <-r.GetDone(): + case <-r.Done(): logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDV3 goroutine exit now...") break LOOP // re-register all services @@ -71,7 +72,7 @@ LOOP: failTimes = 0 for { select { - case <-r.GetDone(): + case <-r.Done(): logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDRegistry goroutine exit now...") break LOOP case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // avoid connect frequent diff --git a/remoting/etcdv3/listener.go b/remoting/etcdv3/listener.go index a4d5805..a51a68b 100644 --- a/remoting/etcdv3/listener.go +++ b/remoting/etcdv3/listener.go @@ -33,6 +33,7 @@ import ( "github.com/apache/dubbo-go/remoting" ) +// EventListener ... type EventListener struct { client *Client keyMapLock sync.Mutex @@ -40,6 +41,7 @@ type EventListener struct { wg sync.WaitGroup } +// NewEventListener ... func NewEventListener(client *Client) *EventListener { return &EventListener{ client: client, @@ -47,7 +49,7 @@ func NewEventListener(client *Client) *EventListener { } } -// Listen on a spec key +// ListenServiceNodeEvent Listen on a spec key // this method will return true when spec key deleted, // this method will return false when deep layer connection lose func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool { @@ -134,7 +136,7 @@ func (l *EventListener) handleEvents(event *clientv3.Event, listeners ...remotin panic("unreachable") } -// Listen on a set of key with spec prefix +// ListenServiceNodeEventWithPrefix Listen on a set of key with spec prefix func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) { l.wg.Add(1) @@ -180,7 +182,7 @@ func timeSecondDuration(sec int) time.Duration { return time.Duration(sec) * time.Second } -// this func is invoked by etcdv3 ConsumerRegistry::Registe/ etcdv3 ConsumerRegistry::get/etcdv3 ConsumerRegistry::getListener +// ListenServiceEvent is invoked by etcdv3 ConsumerRegistry::Registe/ etcdv3 ConsumerRegistry::get/etcdv3 ConsumerRegistry::getListener // registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent // | // --------> ListenServiceNodeEvent @@ -229,6 +231,7 @@ func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataLis }(key) } +// Close ... func (l *EventListener) Close() { l.wg.Wait() } diff --git a/remoting/listener.go b/remoting/listener.go index 8d1e357..3713ba0 100644 --- a/remoting/listener.go +++ b/remoting/listener.go @@ -21,6 +21,7 @@ import ( "fmt" ) +// DataListener ... type DataListener interface { DataChange(eventType Event) bool //bool is return for interface implement is interesting } @@ -29,11 +30,15 @@ type DataListener interface { // event type ////////////////////////////////////////// +// EventType ... type EventType int const ( + // EventTypeAdd ... EventTypeAdd = iota + // EventTypeDel ... EventTypeDel + // EventTypeUpdate ... EventTypeUpdate ) @@ -51,6 +56,7 @@ func (t EventType) String() string { // service event ////////////////////////////////////////// +// Event ... type Event struct { Path string Action EventType diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go index 19d6529..f95231b 100644 --- a/remoting/zookeeper/client.go +++ b/remoting/zookeeper/client.go @@ -25,8 +25,8 @@ import ( ) import ( + "github.com/dubbogo/go-zookeeper/zk" perrors "github.com/pkg/errors" - "github.com/samuel/go-zookeeper/zk" ) import ( @@ -35,14 +35,19 @@ import ( ) const ( - ConnDelay = 3 + // ConnDelay connection delay interval + ConnDelay = 3 + // MaxFailTimes max fail times MaxFailTimes = 15 ) var ( errNilZkClientConn = perrors.New("zookeeperclient{conn} is nil") + errNilChildren = perrors.Errorf("has none children") + errNilNode = perrors.Errorf("node does not exist") ) +// ZookeeperClient ... type ZookeeperClient struct { name string ZkAddrs []string @@ -54,6 +59,7 @@ type ZookeeperClient struct { eventRegistry map[string][]*chan struct{} } +// StateToString ... func StateToString(state zk.State) string { switch state { case zk.StateDisconnected: @@ -85,6 +91,7 @@ func StateToString(state zk.State) string { return "zookeeper unknown state" } +// Options ... type Options struct { zkName string client *ZookeeperClient @@ -92,14 +99,17 @@ type Options struct { ts *zk.TestCluster } +// Option ... type Option func(*Options) +// WithZkName ... func WithZkName(name string) Option { return func(opt *Options) { opt.zkName = name } } +// ValidateZookeeperClient ... func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error { var ( err error @@ -173,12 +183,14 @@ func newZookeeperClient(name string, zkAddrs []string, timeout time.Duration) (* return z, nil } +// WithTestCluster ... func WithTestCluster(ts *zk.TestCluster) Option { return func(opt *Options) { opt.ts = ts } } +// NewMockZookeeperClient ... func NewMockZookeeperClient(name string, timeout time.Duration, opts ...Option) (*zk.TestCluster, *ZookeeperClient, <-chan zk.Event, error) { var ( err error @@ -224,6 +236,7 @@ func NewMockZookeeperClient(name string, timeout time.Duration, opts ...Option) return ts, z, event, nil } +// HandleZkEvent ... func (z *ZookeeperClient) HandleZkEvent(session <-chan zk.Event) { var ( state int @@ -248,11 +261,13 @@ LOOP: logger.Warnf("zk{addr:%s} state is StateDisconnected, so close the zk client{name:%s}.", z.ZkAddrs, z.name) z.stop() z.Lock() - if z.Conn != nil { - z.Conn.Close() - z.Conn = nil - } + conn := z.Conn + z.Conn = nil z.Unlock() + if conn != nil { + conn.Close() + } + break LOOP case (int)(zk.EventNodeDataChanged), (int)(zk.EventNodeChildrenChanged): logger.Infof("zkClient{%s} get zk node changed event{path:%s}", z.name, event.Path) @@ -282,6 +297,7 @@ LOOP: } } +// RegisterEvent ... func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) { if zkPath == "" || event == nil { return @@ -296,6 +312,7 @@ func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) { z.Unlock() } +// UnregisterEvent ... func (z *ZookeeperClient) UnregisterEvent(zkPath string, event *chan struct{}) { if zkPath == "" { return @@ -322,6 +339,7 @@ func (z *ZookeeperClient) UnregisterEvent(zkPath string, event *chan struct{}) { } } +// Done ... func (z *ZookeeperClient) Done() <-chan struct{} { return z.exit } @@ -337,6 +355,7 @@ func (z *ZookeeperClient) stop() bool { return false } +// ZkConnValid ... func (z *ZookeeperClient) ZkConnValid() bool { select { case <-z.exit: @@ -354,6 +373,7 @@ func (z *ZookeeperClient) ZkConnValid() bool { return valid } +// Close ... func (z *ZookeeperClient) Close() { if z == nil { return @@ -362,14 +382,17 @@ func (z *ZookeeperClient) Close() { z.stop() z.Wait.Wait() z.Lock() - if z.Conn != nil { - z.Conn.Close() - z.Conn = nil - } + conn := z.Conn + z.Conn = nil z.Unlock() + if conn != nil { + conn.Close() + } + logger.Warnf("zkClient{name:%s, zk addr:%s} exit now.", z.name, z.ZkAddrs) } +// Create ... func (z *ZookeeperClient) Create(basePath string) error { var ( err error @@ -381,10 +404,12 @@ func (z *ZookeeperClient) Create(basePath string) error { tmpPath = path.Join(tmpPath, "/", str) err = errNilZkClientConn z.Lock() - if z.Conn != nil { - _, err = z.Conn.Create(tmpPath, []byte(""), 0, zk.WorldACL(zk.PermAll)) - } + conn := z.Conn z.Unlock() + if conn != nil { + _, err = conn.Create(tmpPath, []byte(""), 0, zk.WorldACL(zk.PermAll)) + } + if err != nil { if err == zk.ErrNodeExists { logger.Infof("zk.create(\"%s\") exists\n", tmpPath) @@ -398,6 +423,7 @@ func (z *ZookeeperClient) Create(basePath string) error { return nil } +// Delete ... func (z *ZookeeperClient) Delete(basePath string) error { var ( err error @@ -405,14 +431,16 @@ func (z *ZookeeperClient) Delete(basePath string) error { err = errNilZkClientConn z.Lock() - if z.Conn != nil { - err = z.Conn.Delete(basePath, -1) - } + conn := z.Conn z.Unlock() + if conn != nil { + err = conn.Delete(basePath, -1) + } return perrors.WithMessagef(err, "Delete(basePath:%s)", basePath) } +// RegisterTemp ... func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, error) { var ( err error @@ -425,10 +453,12 @@ func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, er data = []byte("") zkPath = path.Join(basePath) + "/" + node z.Lock() - if z.Conn != nil { - tmpPath, err = z.Conn.Create(zkPath, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) - } + conn := z.Conn z.Unlock() + if conn != nil { + tmpPath, err = conn.Create(zkPath, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) + } + //if err != nil && err != zk.ErrNodeExists { if err != nil { logger.Warnf("conn.Create(\"%s\", zk.FlagEphemeral) = error(%v)\n", zkPath, perrors.WithStack(err)) @@ -439,6 +469,7 @@ func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, er return tmpPath, nil } +// RegisterTempSeq ... func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string, error) { var ( err error @@ -447,15 +478,17 @@ func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string, err = errNilZkClientConn z.Lock() - if z.Conn != nil { - tmpPath, err = z.Conn.Create( + conn := z.Conn + z.Unlock() + if conn != nil { + tmpPath, err = conn.Create( path.Join(basePath)+"/", data, zk.FlagEphemeral|zk.FlagSequence, zk.WorldACL(zk.PermAll), ) } - z.Unlock() + logger.Debugf("zookeeperClient.RegisterTempSeq(basePath{%s}) = tempPath{%s}", basePath, tmpPath) if err != nil && err != zk.ErrNodeExists { logger.Errorf("zkClient{%s} conn.Create(\"%s\", \"%s\", zk.FlagEphemeral|zk.FlagSequence) error(%v)\n", @@ -467,37 +500,44 @@ func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string, return tmpPath, nil } +// GetChildrenW ... func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event, error) { var ( err error children []string stat *zk.Stat - event <-chan zk.Event + watcher *zk.Watcher ) err = errNilZkClientConn z.Lock() - if z.Conn != nil { - children, stat, event, err = z.Conn.ChildrenW(path) - } + conn := z.Conn z.Unlock() + if conn != nil { + children, stat, watcher, err = conn.ChildrenW(path) + } + if err != nil { + if err == zk.ErrNoChildrenForEphemerals { + return nil, nil, errNilChildren + } if err == zk.ErrNoNode { - return nil, nil, perrors.Errorf("path{%s} has none children", path) + return nil, nil, errNilNode } logger.Errorf("zk.ChildrenW(path{%s}) = error(%v)", path, err) return nil, nil, perrors.WithMessagef(err, "zk.ChildrenW(path:%s)", path) } if stat == nil { - return nil, nil, perrors.Errorf("path{%s} has none children", path) + return nil, nil, perrors.Errorf("path{%s} get stat is nil", path) } if len(children) == 0 { - return nil, nil, perrors.Errorf("path{%s} has none children", path) + return nil, nil, errNilChildren } - return children, event, nil + return children, watcher.EvtCh, nil } +// GetChildren ... func (z *ZookeeperClient) GetChildren(path string) ([]string, error) { var ( err error @@ -507,10 +547,12 @@ func (z *ZookeeperClient) GetChildren(path string) ([]string, error) { err = errNilZkClientConn z.Lock() - if z.Conn != nil { - children, stat, err = z.Conn.Children(path) - } + conn := z.Conn z.Unlock() + if conn != nil { + children, stat, err = conn.Children(path) + } + if err != nil { if err == zk.ErrNoNode { return nil, perrors.Errorf("path{%s} has none children", path) @@ -522,25 +564,28 @@ func (z *ZookeeperClient) GetChildren(path string) ([]string, error) { return nil, perrors.Errorf("path{%s} has none children", path) } if len(children) == 0 { - return nil, perrors.Errorf("path{%s} has none children", path) + return nil, errNilChildren } return children, nil } +// ExistW ... func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) { var ( - exist bool - err error - event <-chan zk.Event + exist bool + err error + watcher *zk.Watcher ) err = errNilZkClientConn z.Lock() - if z.Conn != nil { - exist, _, event, err = z.Conn.ExistsW(zkPath) - } + conn := z.Conn z.Unlock() + if conn != nil { + exist, _, watcher, err = conn.ExistsW(zkPath) + } + if err != nil { logger.Warnf("zkClient{%s}.ExistsW(path{%s}) = error{%v}.", z.name, zkPath, perrors.WithStack(err)) return nil, perrors.WithMessagef(err, "zk.ExistsW(path:%s)", zkPath) @@ -550,9 +595,10 @@ func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) { return nil, perrors.Errorf("zkClient{%s} App zk path{%s} does not exist.", z.name, zkPath) } - return event, nil + return watcher.EvtCh, nil } +// GetContent ... func (z *ZookeeperClient) GetContent(zkPath string) ([]byte, *zk.Stat, error) { return z.Conn.Get(zkPath) } diff --git a/remoting/zookeeper/client_test.go b/remoting/zookeeper/client_test.go index f1bd0c2..cb41eb3 100644 --- a/remoting/zookeeper/client_test.go +++ b/remoting/zookeeper/client_test.go @@ -24,7 +24,7 @@ import ( ) import ( - "github.com/samuel/go-zookeeper/zk" + "github.com/dubbogo/go-zookeeper/zk" "github.com/stretchr/testify/assert" ) @@ -133,3 +133,12 @@ func TestRegisterTempSeq(t *testing.T) { states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession} verifyEventStateOrder(t, event, states, "event channel") } + +func Test_UnregisterEvent(t *testing.T) { + client := &ZookeeperClient{} + client.eventRegistry = make(map[string][]*chan struct{}) + array := []*chan struct{}{} + array = append(array, new(chan struct{})) + client.eventRegistry["test"] = array + client.UnregisterEvent("test", new(chan struct{})) +} diff --git a/remoting/zookeeper/facade.go b/remoting/zookeeper/facade.go index cdc7ead..055db4f 100644 --- a/remoting/zookeeper/facade.go +++ b/remoting/zookeeper/facade.go @@ -35,11 +35,12 @@ type zkClientFacade interface { SetZkClient(*ZookeeperClient) ZkClientLock() *sync.Mutex WaitGroup() *sync.WaitGroup //for wait group control, zk client listener & zk client container - GetDone() chan struct{} //for zk client control + Done() chan struct{} //for zk client control RestartCallBack() bool common.Node } +// HandleClientRestart ... func HandleClientRestart(r zkClientFacade) { var ( err error @@ -51,7 +52,7 @@ func HandleClientRestart(r zkClientFacade) { LOOP: for { select { - case <-r.GetDone(): + case <-r.Done(): logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...") break LOOP // re-register all services @@ -67,7 +68,7 @@ LOOP: failTimes = 0 for { select { - case <-r.GetDone(): + case <-r.Done(): logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...") break LOOP case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // Prevent crazy reconnection zk. diff --git a/remoting/zookeeper/facade_test.go b/remoting/zookeeper/facade_test.go index 58e0d69..175d758 100644 --- a/remoting/zookeeper/facade_test.go +++ b/remoting/zookeeper/facade_test.go @@ -24,7 +24,7 @@ import ( "time" ) import ( - "github.com/samuel/go-zookeeper/zk" + "github.com/dubbogo/go-zookeeper/zk" "github.com/stretchr/testify/assert" ) import ( @@ -55,7 +55,7 @@ func (r *mockFacade) WaitGroup() *sync.WaitGroup { return &r.wg } -func (r *mockFacade) GetDone() chan struct{} { +func (r *mockFacade) Done() chan struct{} { return r.done } diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index 0b9db5e..4493c06 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -19,15 +19,14 @@ package zookeeper import ( "path" - "strings" "sync" "time" ) import ( "github.com/dubbogo/getty" + "github.com/dubbogo/go-zookeeper/zk" perrors "github.com/pkg/errors" - "github.com/samuel/go-zookeeper/zk" ) import ( @@ -35,6 +34,7 @@ import ( "github.com/apache/dubbo-go/remoting" ) +// ZkEventListener ... type ZkEventListener struct { client *ZookeeperClient pathMapLock sync.Mutex @@ -42,6 +42,7 @@ type ZkEventListener struct { wg sync.WaitGroup } +// NewZkEventListener ... func NewZkEventListener(client *ZookeeperClient) *ZkEventListener { return &ZkEventListener{ client: client, @@ -49,10 +50,12 @@ func NewZkEventListener(client *ZookeeperClient) *ZkEventListener { } } +// SetClient ... func (l *ZkEventListener) SetClient(client *ZookeeperClient) { l.client = client } +// ListenServiceNodeEvent ... func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool { l.wg.Add(1) defer l.wg.Done() @@ -107,8 +110,17 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li newChildren, err := l.client.GetChildren(zkPath) if err != nil { - logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err)) - return + if err == errNilChildren { + content, _, err := l.client.Conn.Get(zkPath) + if err != nil { + logger.Errorf("Get new node path {%v} 's content error,message is {%v}", zkPath, perrors.WithStack(err)) + } else { + listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeUpdate, Content: string(content)}) + } + + } else { + logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err)) + } } // a node was added -- listen the new node @@ -178,7 +190,7 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataLi if MaxFailTimes <= failTimes { failTimes = MaxFailTimes } - logger.Warnf("listenDirEvent(path{%s}) = error{%v}", zkPath, err) + logger.Infof("listenDirEvent(path{%s}) = error{%v}", zkPath, err) // clear the event channel CLEAR: for { @@ -189,6 +201,11 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataLi } } l.client.RegisterEvent(zkPath, &event) + if err == errNilNode { + logger.Warnf("listenDirEvent(path{%s}) got errNilNode,so exit listen", zkPath) + l.client.UnregisterEvent(zkPath, &event) + return + } select { case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): l.client.UnregisterEvent(zkPath, &event) @@ -263,56 +280,11 @@ func timeSecondDuration(sec int) time.Duration { return time.Duration(sec) * time.Second } -// this func is invoked by ZkConsumerRegistry::Register/ZkConsumerRegistry::get/ZkConsumerRegistry::getListener +// ListenServiceEvent is invoked by ZkConsumerRegistry::Register/ZkConsumerRegistry::get/ZkConsumerRegistry::getListener // registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent // | // --------> ListenServiceNodeEvent func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.DataListener) { - var ( - err error - dubboPath string - children []string - ) - - zkPath = strings.ReplaceAll(zkPath, "$", "%24") - l.pathMapLock.Lock() - _, ok := l.pathMap[zkPath] - l.pathMapLock.Unlock() - if ok { - logger.Warnf("@zkPath %s has already been listened.", zkPath) - return - } - - l.pathMapLock.Lock() - l.pathMap[zkPath] = struct{}{} - l.pathMapLock.Unlock() - - logger.Infof("listen dubbo provider path{%s} event and wait to get all provider zk nodes", zkPath) - children, err = l.client.GetChildren(zkPath) - if err != nil { - children = nil - logger.Warnf("fail to get children of zk path{%s}", zkPath) - } - - for _, c := range children { - // listen l service node - dubboPath = path.Join(zkPath, c) - content, _, err := l.client.Conn.Get(dubboPath) - if err != nil { - logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err)) - } - if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) { - continue - } - logger.Infof("listen dubbo service key{%s}", dubboPath) - go func(zkPath string, listener remoting.DataListener) { - if l.ListenServiceNodeEvent(zkPath) { - listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel}) - } - logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath) - }(dubboPath, listener) - } - logger.Infof("listen dubbo path{%s}", zkPath) go func(zkPath string, listener remoting.DataListener) { l.listenDirEvent(zkPath, listener) @@ -324,6 +296,7 @@ func (l *ZkEventListener) valid() bool { return l.client.ZkConnValid() } +// Close ... func (l *ZkEventListener) Close() { l.wg.Wait() } diff --git a/remoting/zookeeper/listener_test.go b/remoting/zookeeper/listener_test.go index aa627c7..43e9aca 100644 --- a/remoting/zookeeper/listener_test.go +++ b/remoting/zookeeper/listener_test.go @@ -24,7 +24,7 @@ import ( "time" ) import ( - "github.com/samuel/go-zookeeper/zk" + "github.com/dubbogo/go-zookeeper/zk" "github.com/stretchr/testify/assert" ) import ( @@ -97,12 +97,11 @@ func TestListener(t *testing.T) { listener := NewZkEventListener(client) dataListener := &mockDataListener{client: client, changedData: changedData, wait: &wait} listener.ListenServiceEvent("/dubbo", dataListener) - + time.Sleep(1 * time.Second) _, err := client.Conn.Set("/dubbo/dubbo.properties", []byte(changedData), 1) assert.NoError(t, err) wait.Wait() assert.Equal(t, changedData, dataListener.eventList[1].Content) - client.Close() }