This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 5c2e47e Fix #1166 (#1249)
5c2e47e is described below
commit 5c2e47e182bb95d0ecd0c4911de2b8a4cb5ef0ff
Author: XavierNiu <[email protected]>
AuthorDate: Mon Jun 14 15:37:09 2021 +0800
Fix #1166 (#1249)
---
registry/zookeeper/service_discovery_test.go | 25 ++++++++++------
remoting/getty/getty_client_test.go | 10 ++++---
remoting/zookeeper/listener.go | 43 +++++++++++++++++++++-------
3 files changed, 55 insertions(+), 23 deletions(-)
diff --git a/registry/zookeeper/service_discovery_test.go
b/registry/zookeeper/service_discovery_test.go
index 5d3d172..512031b 100644
--- a/registry/zookeeper/service_discovery_test.go
+++ b/registry/zookeeper/service_discovery_test.go
@@ -43,16 +43,15 @@ import (
"dubbo.apache.org/dubbo-go/v3/registry/event"
)
-var testName = "test"
-
-var tc *zk.TestCluster
+const testName = "test"
func prepareData(t *testing.T) *zk.TestCluster {
var err error
- tc, err = zk.StartTestCluster(1, nil, nil, zk.WithRetryTimes(20))
+ tc, err := zk.StartTestCluster(1, nil, nil)
assert.NoError(t, err)
assert.NotNil(t, tc.Servers[0])
address := "127.0.0.1:" + strconv.Itoa(tc.Servers[0].Port)
+ //address := "127.0.0.1:2181"
config.GetBaseConfig().ServiceDiscoveries[testName] =
&config.ServiceDiscoveryConfig{
Protocol: "zookeeper",
@@ -71,6 +70,7 @@ func TestNewZookeeperServiceDiscovery(t *testing.T) {
_, err := newZookeeperServiceDiscovery(name)
// the ServiceDiscoveryConfig not found
+ // err: could not init the instance because the config is invalid
assert.NotNil(t, err)
sdc := &config.ServiceDiscoveryConfig{
@@ -81,10 +81,20 @@ func TestNewZookeeperServiceDiscovery(t *testing.T) {
_, err = newZookeeperServiceDiscovery(name)
// RemoteConfig not found
+ // err: could not find the remote config for name: mock
assert.NotNil(t, err)
}
-func TestCURDZookeeperServiceDiscovery(t *testing.T) {
+func TestZookeeperServiceDiscovery_CURDAndListener(t *testing.T) {
+ tc := prepareData(t)
+ defer func() {
+ _ = tc.Stop()
+ }()
+ t.Run("testCURDZookeeperServiceDiscovery",
testCURDZookeeperServiceDiscovery)
+ t.Run("testAddListenerZookeeperServiceDiscovery",
testAddListenerZookeeperServiceDiscovery)
+}
+
+func testCURDZookeeperServiceDiscovery(t *testing.T) {
prepareData(t)
extension.SetEventDispatcher("mock", func() observer.EventDispatcher {
return dispatcher.NewMockEventDispatcher()
@@ -164,10 +174,7 @@ func TestCURDZookeeperServiceDiscovery(t *testing.T) {
assert.Nil(t, err)
}
-func TestAddListenerZookeeperServiceDiscovery(t *testing.T) {
- defer func() {
- _ = tc.Stop()
- }()
+func testAddListenerZookeeperServiceDiscovery(t *testing.T) {
sd, err := newZookeeperServiceDiscovery(testName)
assert.Nil(t, err)
defer func() {
diff --git a/remoting/getty/getty_client_test.go
b/remoting/getty/getty_client_test.go
index a308462..a5b26ba 100644
--- a/remoting/getty/getty_client_test.go
+++ b/remoting/getty/getty_client_test.go
@@ -297,7 +297,7 @@ func testGetUser61(t *testing.T, c *Client) {
func testClient_AsyncCall(t *testing.T, client *Client) {
user := &User{}
- lock := sync.Mutex{}
+ wg := sync.WaitGroup{}
request := remoting.NewRequest("2.0.2")
invocation := createInvocation("GetUser0", nil, nil, []interface{}{"4",
nil, "username"},
[]reflect.Value{reflect.ValueOf("4"), reflect.ValueOf(nil),
reflect.ValueOf("username")})
@@ -314,13 +314,13 @@ func testClient_AsyncCall(t *testing.T, client *Client) {
r := response.(remoting.AsyncCallbackResponse)
rst :=
*r.Reply.(*remoting.Response).Result.(*protocol.RPCResult)
assert.Equal(t, User{ID: "4", Name: "username"},
*(rst.Rest.(*User)))
- lock.Unlock()
+ wg.Done()
}
- lock.Lock()
+ wg.Add(1)
err := client.Request(request, 3*time.Second, rsp)
assert.NoError(t, err)
assert.Equal(t, User{}, *user)
- time.Sleep(1 * time.Second)
+ wg.Done()
}
func InitTest(t *testing.T) (*Server, *common.URL) {
@@ -436,6 +436,8 @@ func (u *UserProvider) GetUser(ctx context.Context, req
[]interface{}, rsp *User
}
func (u *UserProvider) GetUser0(id string, k *User, name string) (User, error)
{
+ // fix testClient_AsyncCall assertion bug(#1233)
+ time.Sleep(1 * time.Second)
return User{ID: id, Name: name}, nil
}
diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go
index 2c97a12..5123197 100644
--- a/remoting/zookeeper/listener.go
+++ b/remoting/zookeeper/listener.go
@@ -18,6 +18,7 @@
package zookeeper
import (
+ uatomic "go.uber.org/atomic"
"path"
"strings"
"sync"
@@ -38,13 +39,13 @@ import (
"dubbo.apache.org/dubbo-go/v3/remoting"
)
-var defaultTTL = 15 * time.Minute
+var defaultTTL = 10 * time.Minute
// nolint
type ZkEventListener struct {
client *gxzookeeper.ZookeeperClient
pathMapLock sync.Mutex
- pathMap map[string]struct{}
+ pathMap map[string]*uatomic.Int32
wg sync.WaitGroup
exit chan struct{}
}
@@ -53,7 +54,7 @@ type ZkEventListener struct {
func NewZkEventListener(client *gxzookeeper.ZookeeperClient) *ZkEventListener {
return &ZkEventListener{
client: client,
- pathMap: make(map[string]struct{}),
+ pathMap: make(map[string]*uatomic.Int32),
exit: make(chan struct{}),
}
}
@@ -81,6 +82,17 @@ func (l *ZkEventListener) ListenServiceNodeEvent(zkPath
string, listener remotin
// nolint
func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener
...remoting.DataListener) bool {
defer l.wg.Done()
+
+ l.pathMapLock.Lock()
+ a, ok := l.pathMap[zkPath]
+ if !ok || a.Load() > 1 {
+ l.pathMapLock.Unlock()
+ return false
+ }
+ a.Inc()
+ l.pathMapLock.Unlock()
+ defer a.Dec()
+
var zkEvent zk.Event
for {
keyEventCh, err := l.client.ExistW(zkPath)
@@ -158,9 +170,6 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string,
children []string, li
newNode string
)
for _, n := range newChildren {
- if contains(children, n) {
- continue
- }
newNode = path.Join(zkPath, n)
logger.Infof("add zkNode{%s}", newNode)
@@ -176,6 +185,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string,
children []string, li
// listen l service node
l.wg.Add(1)
go func(node string, listener remoting.DataListener) {
+ // invoker l.wg.Done() in l.listenServiceNodeEvent
if l.listenServiceNodeEvent(node, listener) {
logger.Warnf("delete zkNode{%s}", node)
listener.DataChange(remoting.Event{Path: node,
Action: remoting.EventTypeDel})
@@ -276,15 +286,15 @@ func (l *ZkEventListener) listenDirEvent(conf
*common.URL, zkPath string, listen
// Save the path to avoid listen repeatedly
l.pathMapLock.Lock()
_, ok := l.pathMap[dubboPath]
+ if !ok {
+ l.pathMap[dubboPath] = uatomic.NewInt32(0)
+ }
l.pathMapLock.Unlock()
if ok {
logger.Warnf("@zkPath %s has already been
listened.", dubboPath)
continue
}
- l.pathMapLock.Lock()
- l.pathMap[dubboPath] = struct{}{}
- l.pathMapLock.Unlock()
// When Zk disconnected, the Conn will be set to nil,
so here need check the value of Conn
l.client.RLock()
if l.client.Conn == nil {
@@ -303,6 +313,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL,
zkPath string, listen
logger.Infof("listen dubbo service key{%s}", dubboPath)
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
+ // invoker l.wg.Done() in
l.listenServiceNodeEvent
if l.listenServiceNodeEvent(zkPath, listener) {
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
l.pathMapLock.Lock()
@@ -324,12 +335,24 @@ func (l *ZkEventListener) listenDirEvent(conf
*common.URL, zkPath string, listen
}
}
// Periodically update provider information
- ticker := time.NewTicker(ttl)
+ tickerTTL := ttl
+ if tickerTTL > 20e9 {
+ tickerTTL = 20e9
+ }
+ ticker := time.NewTicker(tickerTTL)
WATCH:
for {
select {
case <-ticker.C:
l.handleZkNodeEvent(zkPath, children, listener)
+ if tickerTTL < ttl {
+ tickerTTL *= 2
+ if tickerTTL > ttl {
+ tickerTTL = ttl
+ }
+ ticker.Stop()
+ ticker = time.NewTicker(tickerTTL)
+ }
case zkEvent = <-childEventCh:
logger.Warnf("get a zookeeper
childEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server,
zkEvent.Path, zkEvent.State, gxzookeeper.StateToString(zkEvent.State),
zkEvent.Err)