This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 0ea60750f Add wildcard subscription support for zookeeper registry 
(#2267)
0ea60750f is described below

commit 0ea60750f724cdf57847e06af35a153deb763f23
Author: wudong5 <[email protected]>
AuthorDate: Tue Apr 18 10:54:04 2023 +0800

    Add wildcard subscription support for zookeeper registry (#2267)
---
 common/url.go                  |  31 ++++++++++
 common/url_test.go             | 131 +++++++++++++++++++++++++++++++++++++++++
 registry/zookeeper/listener.go |   8 ++-
 remoting/zookeeper/listener.go |  94 +++++++++++++++++++++++++++--
 4 files changed, 257 insertions(+), 7 deletions(-)

diff --git a/common/url.go b/common/url.go
index 058f76a89..2824ec68d 100644
--- a/common/url.go
+++ b/common/url.go
@@ -413,6 +413,37 @@ func ServiceKey(intf string, group string, version string) 
string {
        return buf.String()
 }
 
+// ParseServiceKey gets interface, group and version from service key
+func ParseServiceKey(serviceKey string) (string, string, string) {
+       var (
+               group   string
+               version string
+       )
+       if serviceKey == "" {
+               return "", "", ""
+       }
+       // get group if it exists
+       sepIndex := strings.Index(serviceKey, constant.PathSeparator)
+       if sepIndex != -1 {
+               group = serviceKey[:sepIndex]
+               serviceKey = serviceKey[sepIndex+1:]
+       }
+       // get version if it exists
+       sepIndex = strings.LastIndex(serviceKey, constant.KeySeparator)
+       if sepIndex != -1 {
+               version = serviceKey[sepIndex+1:]
+               serviceKey = serviceKey[:sepIndex]
+       }
+
+       return serviceKey, group, version
+}
+
+// IsAnyCondition judges if is any condition
+func IsAnyCondition(intf, group, version string, serviceURL *URL) bool {
+       return intf == constant.AnyValue && (group == constant.AnyValue ||
+               group == serviceURL.Group()) && (version == constant.AnyValue 
|| version == serviceURL.Version())
+}
+
 // ColonSeparatedKey
 // The format is "{interface}:[version]:[group]"
 func (c *URL) ColonSeparatedKey() string {
diff --git a/common/url_test.go b/common/url_test.go
index 89953c3ab..2971e6c45 100644
--- a/common/url_test.go
+++ b/common/url_test.go
@@ -421,3 +421,134 @@ func TestCompareURLEqualFunc(t *testing.T) {
 func CustomCompareURLEqual(l *URL, r *URL, execludeParam ...string) bool {
        return l.PrimitiveURL == r.PrimitiveURL
 }
+
+func TestParseServiceKey(t *testing.T) {
+       type args struct {
+               serviceKey string
+       }
+       tests := []struct {
+               name  string
+               args  args
+               want  string
+               want1 string
+               want2 string
+       }{
+               {
+                       name: "test1",
+                       args: args{
+                               serviceKey: "group/interface:version",
+                       },
+                       want:  "interface",
+                       want1: "group",
+                       want2: "version",
+               },
+               {
+                       name: "test2",
+                       args: args{
+                               serviceKey: "*/*:*",
+                       },
+                       want:  "*",
+                       want1: "*",
+                       want2: "*",
+               },
+               {
+                       name: "test3",
+                       args: args{
+                               serviceKey: 
"group/org.apache.dubbo.mock.api.MockService",
+                       },
+                       want:  "org.apache.dubbo.mock.api.MockService",
+                       want1: "group",
+                       want2: "",
+               },
+               {
+                       name: "test4",
+                       args: args{
+                               serviceKey: 
"org.apache.dubbo.mock.api.MockService",
+                       },
+                       want:  "org.apache.dubbo.mock.api.MockService",
+                       want1: "",
+                       want2: "",
+               },
+               {
+                       name: "test5",
+                       args: args{
+                               serviceKey: "group/",
+                       },
+                       want:  "",
+                       want1: "group",
+                       want2: "",
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       got, got1, got2 := ParseServiceKey(tt.args.serviceKey)
+                       assert.Equalf(t, tt.want, got, "ParseServiceKey(%v)", 
tt.args.serviceKey)
+                       assert.Equalf(t, tt.want1, got1, "ParseServiceKey(%v)", 
tt.args.serviceKey)
+                       assert.Equalf(t, tt.want2, got2, "ParseServiceKey(%v)", 
tt.args.serviceKey)
+               })
+       }
+}
+
+func TestIsAnyCondition(t *testing.T) {
+       type args struct {
+               intf       string
+               group      string
+               version    string
+               serviceURL *URL
+       }
+       serviceURL, _ := NewURL(GetLocalIp()+":0", WithProtocol("admin"), 
WithParams(url.Values{
+               constant.GroupKey:   {"group"},
+               constant.VersionKey: {"version"},
+       }))
+       tests := []struct {
+               name string
+               args args
+               want bool
+       }{
+               {
+                       name: "test1",
+                       args: args{
+                               intf:       constant.AnyValue,
+                               group:      constant.AnyValue,
+                               version:    constant.AnyValue,
+                               serviceURL: serviceURL,
+                       },
+                       want: true,
+               },
+               {
+                       name: "test2",
+                       args: args{
+                               intf:       constant.AnyValue,
+                               group:      "group",
+                               version:    "version",
+                               serviceURL: serviceURL,
+                       },
+                       want: true,
+               },
+               {
+                       name: "test3",
+                       args: args{
+                               intf:       "intf",
+                               group:      constant.AnyValue,
+                               version:    constant.AnyValue,
+                               serviceURL: serviceURL,
+                       },
+                       want: false,
+               },
+               {
+                       name: "test4",
+                       args: args{
+                               intf:       constant.AnyValue,
+                               group:      "group1",
+                               version:    constant.AnyValue,
+                               serviceURL: serviceURL,
+                       },
+                       want: false,
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       assert.Equalf(t, tt.want, IsAnyCondition(tt.args.intf, 
tt.args.group, tt.args.version, tt.args.serviceURL), "IsAnyCondition(%v, %v, 
%v, %v)", tt.args.intf, tt.args.group, tt.args.version, tt.args.serviceURL)
+               })
+       }
+}
diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go
index 65871adb9..c8f0f26b3 100644
--- a/registry/zookeeper/listener.go
+++ b/registry/zookeeper/listener.go
@@ -92,8 +92,10 @@ func (l *RegistryDataListener) DataChange(event 
remoting.Event) bool {
        if l.closed {
                return false
        }
+       match := false
        for serviceKey, listener := range l.subscribed {
-               if serviceURL.ServiceKey() == serviceKey {
+               intf, group, version := common.ParseServiceKey(serviceKey)
+               if serviceURL.ServiceKey() == serviceKey || 
common.IsAnyCondition(intf, group, version, serviceURL) {
                        listener.Process(
                                &config_center.ConfigChangeEvent{
                                        Key:        event.Path,
@@ -101,10 +103,10 @@ func (l *RegistryDataListener) DataChange(event 
remoting.Event) bool {
                                        ConfigType: event.Action,
                                },
                        )
-                       return true
+                       match = true
                }
        }
-       return false
+       return match
 }
 
 // Close all RegistryConfigurationListener in subscribed
diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go
index 498fc3309..2e9abe3c8 100644
--- a/remoting/zookeeper/listener.go
+++ b/remoting/zookeeper/listener.go
@@ -18,6 +18,7 @@
 package zookeeper
 
 import (
+       "net/url"
        "path"
        "strings"
        "sync"
@@ -238,8 +239,89 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, 
children []string, li
                listener.DataChange(remoting.Event{Path: oldNode, Action: 
remoting.EventTypeDel})
        }
 }
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, 
listener remoting.DataListener) {
+
+// listenerAllDirEvents listens all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvents(conf *common.URL, listener 
remoting.DataListener) {
+       var (
+               failTimes int
+               ttl       time.Duration
+       )
+       ttl = defaultTTL
+       if conf != nil {
+               if timeout, err := 
time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, 
constant.DefaultRegTTL)); err == nil {
+                       ttl = timeout
+               } else {
+                       logger.Warnf("[Zookeeper EventListener][listenDirEvent] 
Wrong configuration for registry.ttl, error=%+v, using default value %v 
instead", err, defaultTTL)
+               }
+       }
+       if ttl > 20e9 {
+               ttl = 20e9
+       }
+
+       rootPath := path.Join(constant.PathSeparator, constant.Dubbo)
+       for {
+               // get all interfaces
+               children, childEventCh, err := l.Client.GetChildrenW(rootPath)
+               if err != nil {
+                       failTimes++
+                       if MaxFailTimes <= failTimes {
+                               failTimes = MaxFailTimes
+                       }
+                       logger.Errorf("[Zookeeper 
EventListener][listenDirEvent] Get children of path {%s} with watcher failed, 
the error is %+v", rootPath, err)
+                       // Maybe the zookeeper does not ready yet, sleep 
failTimes * ConnDelay senconds to wait
+                       after := time.After(timeSecondDuration(failTimes * 
ConnDelay))
+                       select {
+                       case <-after:
+                               continue
+                       case <-l.exit:
+                               return
+                       }
+               }
+               failTimes = 0
+               if len(children) == 0 {
+                       logger.Warnf("[Zookeeper EventListener][listenDirEvent] 
Can not get any children for the path \"%s\", please check if the provider does 
ready.", rootPath)
+               }
+               for _, c := range children {
+                       // Build the child path
+                       zkRootPath := path.Join(rootPath, 
constant.PathSeparator, url.QueryEscape(c), constant.PathSeparator, 
constant.ProvidersCategory)
+                       // Save the path to avoid listen repeatedly
+                       l.pathMapLock.Lock()
+                       if _, ok := l.pathMap[zkRootPath]; ok {
+                               logger.Warnf("[Zookeeper 
EventListener][listenDirEvent] The child with zk path {%s} has already been 
listened.", zkRootPath)
+                               l.pathMapLock.Unlock()
+                               continue
+                       } else {
+                               l.pathMap[zkRootPath] = uatomic.NewInt32(0)
+                       }
+                       l.pathMapLock.Unlock()
+                       logger.Debugf("[Zookeeper 
EventListener][listenDirEvent] listen dubbo interface key{%s}", zkRootPath)
+                       l.wg.Add(1)
+                       // listen every interface
+                       go l.listenDirEvent(conf, zkRootPath, listener, c)
+               }
+
+               ticker := time.NewTicker(ttl)
+               select {
+               case <-ticker.C:
+                       ticker.Stop()
+               case zkEvent := <-childEventCh:
+                       logger.Debugf("Get a zookeeper childEventCh{type:%s, 
server:%s, path:%s, state:%d-%s, err:%v}",
+                               zkEvent.Type.String(), zkEvent.Server, 
zkEvent.Path, zkEvent.State, gxzookeeper.StateToString(zkEvent.State), 
zkEvent.Err)
+                       ticker.Stop()
+               case <-l.exit:
+                       logger.Warnf("listen(path{%s}) goroutine exit now...", 
rootPath)
+                       ticker.Stop()
+                       return
+               }
+       }
+}
+
+func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, 
listener remoting.DataListener, intf string) {
        defer l.wg.Done()
+       if intf == constant.AnyValue {
+               l.listenAllDirEvents(conf, listener)
+               return
+       }
        var (
                failTimes int
                ttl       time.Duration
@@ -279,7 +361,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, 
zkRootPath string, li
                        // Only need to compare Path when subscribing to 
provider
                        if strings.LastIndex(zkRootPath, 
constant.ProviderCategory) != -1 {
                                provider, _ := common.NewURL(c)
-                               if provider.ServiceKey() != conf.ServiceKey() {
+                               if provider.Interface() != intf || 
!common.IsAnyCondition(constant.AnyValue, conf.Group(), conf.Version(), 
provider) {
                                        continue
                                }
                        }
@@ -326,7 +408,6 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, 
zkRootPath string, li
                }
                if l.startScheduleWatchTask(zkRootPath, children, ttl, 
listener, childEventCh) {
                        return
-
                }
        }
 }
@@ -367,6 +448,7 @@ func (l *ZkEventListener) startScheduleWatchTask(
                }
        }
 }
+
 func timeSecondDuration(sec int) time.Duration {
        return time.Duration(sec) * time.Second
 }
@@ -378,7 +460,11 @@ func (l *ZkEventListener) ListenServiceEvent(conf 
*common.URL, zkPath string, li
        logger.Infof("[Zookeeper Listener] listen dubbo path{%s}", zkPath)
        l.wg.Add(1)
        go func(zkPath string, listener remoting.DataListener) {
-               l.listenDirEvent(conf, zkPath, listener)
+               intf := ""
+               if conf != nil {
+                       intf = conf.Interface()
+               }
+               l.listenDirEvent(conf, zkPath, listener, intf)
                logger.Warnf("ListenServiceEvent->listenDirEvent(zkPath{%s}) 
goroutine exit now", zkPath)
        }(zkPath, listener)
 }

Reply via email to