This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 5c2e47e  Fix #1166 (#1249)
5c2e47e is described below

commit 5c2e47e182bb95d0ecd0c4911de2b8a4cb5ef0ff
Author: XavierNiu <[email protected]>
AuthorDate: Mon Jun 14 15:37:09 2021 +0800

    Fix #1166 (#1249)
---
 registry/zookeeper/service_discovery_test.go | 25 ++++++++++------
 remoting/getty/getty_client_test.go          | 10 ++++---
 remoting/zookeeper/listener.go               | 43 +++++++++++++++++++++-------
 3 files changed, 55 insertions(+), 23 deletions(-)

diff --git a/registry/zookeeper/service_discovery_test.go 
b/registry/zookeeper/service_discovery_test.go
index 5d3d172..512031b 100644
--- a/registry/zookeeper/service_discovery_test.go
+++ b/registry/zookeeper/service_discovery_test.go
@@ -43,16 +43,15 @@ import (
        "dubbo.apache.org/dubbo-go/v3/registry/event"
 )
 
-var testName = "test"
-
-var tc *zk.TestCluster
+const testName = "test"
 
 func prepareData(t *testing.T) *zk.TestCluster {
        var err error
-       tc, err = zk.StartTestCluster(1, nil, nil, zk.WithRetryTimes(20))
+       tc, err := zk.StartTestCluster(1, nil, nil)
        assert.NoError(t, err)
        assert.NotNil(t, tc.Servers[0])
        address := "127.0.0.1:" + strconv.Itoa(tc.Servers[0].Port)
+       //address := "127.0.0.1:2181"
 
        config.GetBaseConfig().ServiceDiscoveries[testName] = 
&config.ServiceDiscoveryConfig{
                Protocol:  "zookeeper",
@@ -71,6 +70,7 @@ func TestNewZookeeperServiceDiscovery(t *testing.T) {
        _, err := newZookeeperServiceDiscovery(name)
 
        // the ServiceDiscoveryConfig not found
+       // err: could not init the instance because the config is invalid
        assert.NotNil(t, err)
 
        sdc := &config.ServiceDiscoveryConfig{
@@ -81,10 +81,20 @@ func TestNewZookeeperServiceDiscovery(t *testing.T) {
        _, err = newZookeeperServiceDiscovery(name)
 
        // RemoteConfig not found
+       // err: could not find the remote config for name: mock
        assert.NotNil(t, err)
 }
 
-func TestCURDZookeeperServiceDiscovery(t *testing.T) {
+func TestZookeeperServiceDiscovery_CURDAndListener(t *testing.T) {
+       tc := prepareData(t)
+       defer func() {
+               _ = tc.Stop()
+       }()
+       t.Run("testCURDZookeeperServiceDiscovery", 
testCURDZookeeperServiceDiscovery)
+       t.Run("testAddListenerZookeeperServiceDiscovery", 
testAddListenerZookeeperServiceDiscovery)
+}
+
+func testCURDZookeeperServiceDiscovery(t *testing.T) {
        prepareData(t)
        extension.SetEventDispatcher("mock", func() observer.EventDispatcher {
                return dispatcher.NewMockEventDispatcher()
@@ -164,10 +174,7 @@ func TestCURDZookeeperServiceDiscovery(t *testing.T) {
        assert.Nil(t, err)
 }
 
-func TestAddListenerZookeeperServiceDiscovery(t *testing.T) {
-       defer func() {
-               _ = tc.Stop()
-       }()
+func testAddListenerZookeeperServiceDiscovery(t *testing.T) {
        sd, err := newZookeeperServiceDiscovery(testName)
        assert.Nil(t, err)
        defer func() {
diff --git a/remoting/getty/getty_client_test.go 
b/remoting/getty/getty_client_test.go
index a308462..a5b26ba 100644
--- a/remoting/getty/getty_client_test.go
+++ b/remoting/getty/getty_client_test.go
@@ -297,7 +297,7 @@ func testGetUser61(t *testing.T, c *Client) {
 
 func testClient_AsyncCall(t *testing.T, client *Client) {
        user := &User{}
-       lock := sync.Mutex{}
+       wg := sync.WaitGroup{}
        request := remoting.NewRequest("2.0.2")
        invocation := createInvocation("GetUser0", nil, nil, []interface{}{"4", 
nil, "username"},
                []reflect.Value{reflect.ValueOf("4"), reflect.ValueOf(nil), 
reflect.ValueOf("username")})
@@ -314,13 +314,13 @@ func testClient_AsyncCall(t *testing.T, client *Client) {
                r := response.(remoting.AsyncCallbackResponse)
                rst := 
*r.Reply.(*remoting.Response).Result.(*protocol.RPCResult)
                assert.Equal(t, User{ID: "4", Name: "username"}, 
*(rst.Rest.(*User)))
-               lock.Unlock()
+               wg.Done()
        }
-       lock.Lock()
+       wg.Add(1)
        err := client.Request(request, 3*time.Second, rsp)
        assert.NoError(t, err)
        assert.Equal(t, User{}, *user)
-       time.Sleep(1 * time.Second)
+       wg.Done()
 }
 
 func InitTest(t *testing.T) (*Server, *common.URL) {
@@ -436,6 +436,8 @@ func (u *UserProvider) GetUser(ctx context.Context, req 
[]interface{}, rsp *User
 }
 
 func (u *UserProvider) GetUser0(id string, k *User, name string) (User, error) 
{
+       // fix testClient_AsyncCall assertion bug(#1233)
+       time.Sleep(1 * time.Second)
        return User{ID: id, Name: name}, nil
 }
 
diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go
index 2c97a12..5123197 100644
--- a/remoting/zookeeper/listener.go
+++ b/remoting/zookeeper/listener.go
@@ -18,6 +18,7 @@
 package zookeeper
 
 import (
+       uatomic "go.uber.org/atomic"
        "path"
        "strings"
        "sync"
@@ -38,13 +39,13 @@ import (
        "dubbo.apache.org/dubbo-go/v3/remoting"
 )
 
-var defaultTTL = 15 * time.Minute
+var defaultTTL = 10 * time.Minute
 
 // nolint
 type ZkEventListener struct {
        client      *gxzookeeper.ZookeeperClient
        pathMapLock sync.Mutex
-       pathMap     map[string]struct{}
+       pathMap     map[string]*uatomic.Int32
        wg          sync.WaitGroup
        exit        chan struct{}
 }
@@ -53,7 +54,7 @@ type ZkEventListener struct {
 func NewZkEventListener(client *gxzookeeper.ZookeeperClient) *ZkEventListener {
        return &ZkEventListener{
                client:  client,
-               pathMap: make(map[string]struct{}),
+               pathMap: make(map[string]*uatomic.Int32),
                exit:    make(chan struct{}),
        }
 }
@@ -81,6 +82,17 @@ func (l *ZkEventListener) ListenServiceNodeEvent(zkPath 
string, listener remotin
 // nolint
 func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener 
...remoting.DataListener) bool {
        defer l.wg.Done()
+
+       l.pathMapLock.Lock()
+       a, ok := l.pathMap[zkPath]
+       if !ok || a.Load() > 1 {
+               l.pathMapLock.Unlock()
+               return false
+       }
+       a.Inc()
+       l.pathMapLock.Unlock()
+       defer a.Dec()
+
        var zkEvent zk.Event
        for {
                keyEventCh, err := l.client.ExistW(zkPath)
@@ -158,9 +170,6 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, 
children []string, li
                newNode string
        )
        for _, n := range newChildren {
-               if contains(children, n) {
-                       continue
-               }
 
                newNode = path.Join(zkPath, n)
                logger.Infof("add zkNode{%s}", newNode)
@@ -176,6 +185,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, 
children []string, li
                // listen l service node
                l.wg.Add(1)
                go func(node string, listener remoting.DataListener) {
+                       // invoker l.wg.Done() in l.listenServiceNodeEvent
                        if l.listenServiceNodeEvent(node, listener) {
                                logger.Warnf("delete zkNode{%s}", node)
                                listener.DataChange(remoting.Event{Path: node, 
Action: remoting.EventTypeDel})
@@ -276,15 +286,15 @@ func (l *ZkEventListener) listenDirEvent(conf 
*common.URL, zkPath string, listen
                        // Save the path to avoid listen repeatedly
                        l.pathMapLock.Lock()
                        _, ok := l.pathMap[dubboPath]
+                       if !ok {
+                               l.pathMap[dubboPath] = uatomic.NewInt32(0)
+                       }
                        l.pathMapLock.Unlock()
                        if ok {
                                logger.Warnf("@zkPath %s has already been 
listened.", dubboPath)
                                continue
                        }
 
-                       l.pathMapLock.Lock()
-                       l.pathMap[dubboPath] = struct{}{}
-                       l.pathMapLock.Unlock()
                        // When Zk disconnected, the Conn will be set to nil, 
so here need check the value of Conn
                        l.client.RLock()
                        if l.client.Conn == nil {
@@ -303,6 +313,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, 
zkPath string, listen
                        logger.Infof("listen dubbo service key{%s}", dubboPath)
                        l.wg.Add(1)
                        go func(zkPath string, listener remoting.DataListener) {
+                               // invoker l.wg.Done() in 
l.listenServiceNodeEvent
                                if l.listenServiceNodeEvent(zkPath, listener) {
                                        
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
                                        l.pathMapLock.Lock()
@@ -324,12 +335,24 @@ func (l *ZkEventListener) listenDirEvent(conf 
*common.URL, zkPath string, listen
                        }
                }
                // Periodically update provider information
-               ticker := time.NewTicker(ttl)
+               tickerTTL := ttl
+               if tickerTTL > 20e9 {
+                       tickerTTL = 20e9
+               }
+               ticker := time.NewTicker(tickerTTL)
        WATCH:
                for {
                        select {
                        case <-ticker.C:
                                l.handleZkNodeEvent(zkPath, children, listener)
+                               if tickerTTL < ttl {
+                                       tickerTTL *= 2
+                                       if tickerTTL > ttl {
+                                               tickerTTL = ttl
+                                       }
+                                       ticker.Stop()
+                                       ticker = time.NewTicker(tickerTTL)
+                               }
                        case zkEvent = <-childEventCh:
                                logger.Warnf("get a zookeeper 
childEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
                                        zkEvent.Type.String(), zkEvent.Server, 
zkEvent.Path, zkEvent.State, gxzookeeper.StateToString(zkEvent.State), 
zkEvent.Err)

Reply via email to