This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/develop by this push:
new d0a90bc41 Add unit test of config center (#3134)
d0a90bc41 is described below
commit d0a90bc4120d05a39bdf109e4c41d13bbb14daad
Author: Sirui Huang <[email protected]>
AuthorDate: Sat Dec 27 15:56:12 2025 +0800
Add unit test of config center (#3134)
* add the unit test
---
config_center/apollo/listener_test.go | 72 +++++++++++
config_center/file/impl_test.go | 2 +-
config_center/file/listener.go | 60 ++++++---
config_center/file/listener_test.go | 87 +++++++++++++
config_center/nacos/listener_test.go | 66 ++++++++++
config_center/options_test.go | 88 ++++++++++++++
config_center/parser/configuration_parser_test.go | 39 ++++++
config_center/zookeeper/impl.go | 19 +--
config_center/zookeeper/impl_test.go | 135 +++++++++++++++++++++
config_center/zookeeper/listener_test.go | 86 +++++++++++++
protocol/triple/triple_protocol/triple_ext_test.go | 16 ---
11 files changed, 629 insertions(+), 41 deletions(-)
diff --git a/config_center/apollo/listener_test.go
b/config_center/apollo/listener_test.go
new file mode 100644
index 000000000..06b1fab43
--- /dev/null
+++ b/config_center/apollo/listener_test.go
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package apollo
+
+import (
+ "testing"
+)
+
+import (
+ "github.com/apolloconfig/agollo/v4/storage"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/config_center"
+ "dubbo.apache.org/dubbo-go/v3/remoting"
+)
+
+type recordingListener struct {
+ events []*config_center.ConfigChangeEvent
+}
+
+func (r *recordingListener) Process(e *config_center.ConfigChangeEvent) {
+ r.events = append(r.events, e)
+}
+
+func TestApolloListener(t *testing.T) {
+ l := newApolloListener()
+ rec := &recordingListener{}
+
+ // add/remove idempotent
+ l.AddListener(rec)
+ l.AddListener(rec)
+ l.RemoveListener(rec)
+ l.AddListener(rec)
+
+ change := &storage.FullChangeEvent{
+ Changes: map[string]any{"k": "v"},
+ }
+ change.Namespace = "application"
+ l.OnNewestChange(change)
+
+ if len(rec.events) != 1 {
+ t.Fatalf("expected 1 event, got %d", len(rec.events))
+ }
+ ev := rec.events[0]
+ if ev.Key != "application" || ev.ConfigType != remoting.EventTypeUpdate
{
+ t.Fatalf("unexpected event %+v", ev)
+ }
+ if l.IsEmpty() {
+ t.Fatalf("listener should not be empty after add")
+ }
+
+ l.RemoveListener(rec)
+ if !l.IsEmpty() {
+ t.Fatalf("listener should be empty after remove")
+ }
+}
diff --git a/config_center/file/impl_test.go b/config_center/file/impl_test.go
index db60c3081..9696fd83b 100644
--- a/config_center/file/impl_test.go
+++ b/config_center/file/impl_test.go
@@ -35,7 +35,7 @@ import (
)
const (
- key = "com.dubbo.go"
+ key = "com_dubbo_go"
)
func initFileData(t *testing.T) (*FileSystemDynamicConfiguration, error) {
diff --git a/config_center/file/listener.go b/config_center/file/listener.go
index 60c7d82aa..5f491d043 100644
--- a/config_center/file/listener.go
+++ b/config_center/file/listener.go
@@ -38,6 +38,7 @@ import (
type CacheListener struct {
watch *fsnotify.Watcher
keyListeners sync.Map
+ contentCache sync.Map
rootPath string
}
@@ -52,28 +53,51 @@ func NewCacheListener(rootPath string) *CacheListener {
go func() {
for {
select {
- case event := <-watch.Events:
+ case event, ok := <-watch.Events:
+ if !ok {
+ return
+ }
key := event.Name
+ if key == "" {
+ continue
+ }
logger.Debugf("watcher %s, event %v",
cl.rootPath, event)
+ if event.Op&fsnotify.Remove == fsnotify.Remove {
+ cl.contentCache.Delete(key)
+ if l, ok := cl.keyListeners.Load(key);
ok {
+
removeCallback(l.(map[config_center.ConfigurationListener]struct{}), key,
remoting.EventTypeDel)
+ }
+ }
if event.Op&fsnotify.Write == fsnotify.Write {
+ content := getFileContent(key)
+ if prev, ok :=
cl.contentCache.Load(key); ok {
+ if prevStr, ok :=
prev.(string); ok && prevStr == content {
+ continue
+ }
+ }
+ cl.contentCache.Store(key, content)
if l, ok := cl.keyListeners.Load(key);
ok {
-
dataChangeCallback(l.(map[config_center.ConfigurationListener]struct{}), key,
+
dataChangeCallback(l.(map[config_center.ConfigurationListener]struct{}), key,
content,
remoting.EventTypeUpdate)
}
}
if event.Op&fsnotify.Create == fsnotify.Create {
+ content := getFileContent(key)
+ if prev, ok :=
cl.contentCache.Load(key); ok {
+ if prevStr, ok :=
prev.(string); ok && prevStr == content {
+ continue
+ }
+ }
+ cl.contentCache.Store(key, content)
if l, ok := cl.keyListeners.Load(key);
ok {
-
dataChangeCallback(l.(map[config_center.ConfigurationListener]struct{}), key,
+
dataChangeCallback(l.(map[config_center.ConfigurationListener]struct{}), key,
content,
remoting.EventTypeAdd)
}
}
- if event.Op&fsnotify.Remove == fsnotify.Remove {
- if l, ok := cl.keyListeners.Load(key);
ok {
-
removeCallback(l.(map[config_center.ConfigurationListener]struct{}), key,
remoting.EventTypeDel)
- }
+ case err, ok := <-watch.Errors:
+ if !ok {
+ return
}
- case err := <-watch.Errors:
- // err may be nil, ignore
if err != nil {
logger.Warnf("file : listen watch
fail:%+v", err)
}
@@ -99,14 +123,13 @@ func removeCallback(lmap
map[config_center.ConfigurationListener]struct{}, key s
}
}
-func dataChangeCallback(lmap map[config_center.ConfigurationListener]struct{},
key string, event remoting.EventType) {
+func dataChangeCallback(lmap map[config_center.ConfigurationListener]struct{},
key, content string, event remoting.EventType) {
if len(lmap) == 0 {
logger.Warnf("file watch callback but configuration listener is
empty, key:%s, event:%v", key, event)
return
}
- c := getFileContent(key)
for l := range lmap {
- callback(l, key, c, event)
+ callback(l, key, content, event)
}
}
@@ -147,10 +170,17 @@ func (cl *CacheListener) RemoveListener(key string,
listener config_center.Confi
if !loaded {
return
}
- delete(listeners.(map[config_center.ConfigurationListener]struct{}),
listener)
- if err := cl.watch.Remove(key); err != nil {
- logger.Errorf("watcher remove path:%s err:%v", key, err)
+ lmap := listeners.(map[config_center.ConfigurationListener]struct{})
+ delete(lmap, listener)
+ if len(lmap) == 0 {
+ cl.keyListeners.Delete(key)
+ cl.contentCache.Delete(key)
+ if err := cl.watch.Remove(key); err != nil {
+ logger.Errorf("watcher remove path:%s err:%v", key, err)
+ }
+ return
}
+ cl.keyListeners.Store(key, lmap)
}
func getFileContent(path string) string {
diff --git a/config_center/file/listener_test.go
b/config_center/file/listener_test.go
new file mode 100644
index 000000000..624e35a5b
--- /dev/null
+++ b/config_center/file/listener_test.go
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package file
+
+import (
+ "os"
+ "path/filepath"
+ "testing"
+ "time"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/config_center"
+ "dubbo.apache.org/dubbo-go/v3/remoting"
+)
+
+type recListener struct {
+ ch chan *config_center.ConfigChangeEvent
+}
+
+func (r *recListener) Process(e *config_center.ConfigChangeEvent) {
+ select {
+ case r.ch <- e:
+ default:
+ }
+}
+
+func TestCacheListenerCallbacks(t *testing.T) {
+ dir := t.TempDir()
+ filePath := filepath.Join(dir, "cfg.txt")
+ if err := os.WriteFile(filePath, []byte("init"), 0o644); err != nil {
+ t.Fatalf("write file error %v", err)
+ }
+
+ cl := NewCacheListener(dir)
+
+ rec := &recListener{ch: make(chan *config_center.ConfigChangeEvent, 4)}
+ cl.AddListener(filePath, rec)
+
+ // update event
+ if err := os.WriteFile(filePath, []byte("update"), 0o644); err != nil {
+ t.Fatalf("write file error %v", err)
+ }
+ waitEvent(t, rec.ch, remoting.EventTypeUpdate)
+
+ // remove listener then cleanup
+ cl.RemoveListener(filePath, rec)
+ if err := cl.Close(); err != nil {
+ t.Fatalf("close watcher error %v", err)
+ }
+ if err := os.Remove(filePath); err != nil {
+ t.Fatalf("remove file error %v", err)
+ }
+ select {
+ case <-rec.ch:
+ // should not receive after removal
+ t.Fatalf("unexpected event after remove")
+ case <-time.After(200 * time.Millisecond):
+ }
+}
+
+func waitEvent(t *testing.T, ch <-chan *config_center.ConfigChangeEvent,
expect remoting.EventType) {
+ t.Helper()
+ select {
+ case ev := <-ch:
+ if ev.ConfigType != expect {
+ t.Fatalf("expected %v, got %v", expect, ev.ConfigType)
+ }
+ case <-time.After(2 * time.Second):
+ t.Fatalf("timeout waiting for event %v", expect)
+ }
+}
diff --git a/config_center/nacos/listener_test.go
b/config_center/nacos/listener_test.go
new file mode 100644
index 000000000..00ceb7f1d
--- /dev/null
+++ b/config_center/nacos/listener_test.go
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package nacos
+
+import (
+ "sync"
+ "testing"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/config_center"
+ "dubbo.apache.org/dubbo-go/v3/remoting"
+)
+
+type recordingListener struct {
+ events []*config_center.ConfigChangeEvent
+}
+
+func (r *recordingListener) Process(e *config_center.ConfigChangeEvent) {
+ r.events = append(r.events, e)
+}
+
+func TestCallback(t *testing.T) {
+ l := &recordingListener{}
+ var m sync.Map
+ m.Store(l, struct{}{})
+
+ callback(&m, "", "g", "data", "payload")
+
+ if len(l.events) != 1 {
+ t.Fatalf("expected 1 event, got %d", len(l.events))
+ }
+ if l.events[0].Key != "data" || l.events[0].Value != "payload" ||
l.events[0].ConfigType != remoting.EventTypeUpdate {
+ t.Fatalf("unexpected event %+v", l.events[0])
+ }
+}
+
+func TestRemoveListener(t *testing.T) {
+ n := &nacosDynamicConfiguration{}
+ key := "k"
+ l := &recordingListener{}
+ inner := &sync.Map{}
+ inner.Store(l, struct{}{})
+ n.keyListeners.Store(key, inner)
+
+ n.removeListener(key, l)
+
+ if _, ok := inner.Load(l); ok {
+ t.Fatalf("listener should be removed")
+ }
+}
diff --git a/config_center/options_test.go b/config_center/options_test.go
new file mode 100644
index 000000000..6a7ea5ab7
--- /dev/null
+++ b/config_center/options_test.go
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package config_center
+
+import (
+ "testing"
+ "time"
+)
+
+import (
+ "github.com/stretchr/testify/assert"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/common/constant/file"
+ "dubbo.apache.org/dubbo-go/v3/global"
+)
+
+func TestOptionsProtocolAndAddress(t *testing.T) {
+ cases := []struct {
+ name string
+ opts []Option
+ expected string
+ }{
+ {name: "zookeeper", opts: []Option{WithZookeeper()}, expected:
constant.ZookeeperKey},
+ {name: "nacos", opts: []Option{WithNacos()}, expected:
constant.NacosKey},
+ {name: "apollo", opts: []Option{WithApollo()}, expected:
constant.ApolloKey},
+ {name: "custom", opts: []Option{WithConfigCenter("etcd")},
expected: "etcd"},
+ {name: "address with scheme", opts:
[]Option{WithAddress("zk://127.0.0.1:2181")}, expected: "zk"},
+ }
+
+ for _, tt := range cases {
+ t.Run(tt.name, func(t *testing.T) {
+ co := NewOptions(tt.opts...)
+ assert.Equal(t, tt.expected, co.Center.Protocol)
+ })
+ }
+}
+
+func TestOptionsFields(t *testing.T) {
+ center := &global.CenterConfig{}
+ opts := []Option{
+ WithDataID("dataId"),
+ WithCluster("cluster"),
+ WithGroup("group"),
+ WithUsername("user"),
+ WithPassword("pwd"),
+ WithNamespace("ns"),
+ WithAppID("app"),
+ WithTimeout(1500 * time.Millisecond),
+ WithParams(map[string]string{"k": "v"}),
+ WithFile(),
+ WithFileExtJson(),
+ }
+
+ wrapped := &Options{Center: center}
+ for _, o := range opts {
+ o(wrapped)
+ }
+
+ assert.Equal(t, "dataId", center.DataId)
+ assert.Equal(t, "cluster", center.Cluster)
+ assert.Equal(t, "group", center.Group)
+ assert.Equal(t, "user", center.Username)
+ assert.Equal(t, "pwd", center.Password)
+ assert.Equal(t, "ns", center.Namespace)
+ assert.Equal(t, "app", center.AppID)
+ assert.Equal(t, "1500", center.Timeout)
+ assert.Equal(t, map[string]string{"k": "v"}, center.Params)
+ assert.Equal(t, constant.FileKey, center.Protocol)
+ assert.Equal(t, string(file.JSON), center.FileExtension)
+}
diff --git a/config_center/parser/configuration_parser_test.go
b/config_center/parser/configuration_parser_test.go
index be2d45b25..ffffc8463 100644
--- a/config_center/parser/configuration_parser_test.go
+++ b/config_center/parser/configuration_parser_test.go
@@ -60,6 +60,34 @@ configs:
assert.Equal(t, "0.0.0.0", urls[0].Location)
}
+func TestDefaultConfigurationParserAppScopeDefaults(t *testing.T) {
+ parser := &DefaultConfigurationParser{}
+ content := `configVersion: 3.0.0
+scope: application
+key: app-key
+enabled: true
+configs:
+- type: custom
+ enabled: false
+ addresses: []
+ providerAddresses: []
+ services: []
+ applications: []
+ parameters:
+ mock: v
+ side: consumer`
+ urls, err := parser.ParseToUrls(content)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(urls))
+ assert.Equal(t, "override", urls[0].Protocol)
+ assert.Equal(t, "0.0.0.0", urls[0].Location)
+ assert.Equal(t, "*", urls[0].Service())
+ assert.Equal(t, "app-key", urls[0].GetParam("application", ""))
+ assert.Equal(t, "dynamicconfigurators", urls[0].GetParam("category",
""))
+ assert.Equal(t, "3.0.0", urls[0].GetParam("configVersion", ""))
+ assert.Equal(t, "false", urls[0].GetParam("enabled", ""))
+}
+
func TestDefaultConfigurationParserServiceItemToUrls_ParserToUrls(t
*testing.T) {
parser := &DefaultConfigurationParser{}
content := `configVersion: 2.7.1
@@ -87,3 +115,14 @@ configs:
assert.Equal(t, "override", urls[0].Protocol)
assert.Equal(t, "0.0.0.0", urls[0].Location)
}
+
+func TestGetEnabledString(t *testing.T) {
+ item := ConfigItem{Enabled: false}
+ cfg := ConfiguratorConfig{Enabled: true}
+ // when type empty/general use config.enabled
+ assert.Equal(t, "&enabled=true", getEnabledString(item, cfg))
+
+ item.Type = "custom"
+ item.Enabled = false
+ assert.Equal(t, "&enabled=false", getEnabledString(item, cfg))
+}
diff --git a/config_center/zookeeper/impl.go b/config_center/zookeeper/impl.go
index a0f2efd4a..1544706cf 100644
--- a/config_center/zookeeper/impl.go
+++ b/config_center/zookeeper/impl.go
@@ -20,6 +20,7 @@ package zookeeper
import (
"encoding/base64"
"errors"
+ "path"
"strconv"
"strings"
"sync"
@@ -111,12 +112,12 @@ func (c *zookeeperDynamicConfiguration) AddListener(key
string, listener config_
// buildPath build path and format
func buildPath(rootPath, subPath string) string {
- path := strings.TrimRight(rootPath+pathSeparator+subPath, pathSeparator)
- if !strings.HasPrefix(path, pathSeparator) {
- path = pathSeparator + path
+ fullPath := strings.TrimRight(rootPath+pathSeparator+subPath,
pathSeparator)
+ if !strings.HasPrefix(fullPath, pathSeparator) {
+ fullPath = pathSeparator + fullPath
}
- path = strings.ReplaceAll(path, "//", "/")
- return path
+
+ return path.Clean(fullPath)
}
func (c *zookeeperDynamicConfiguration) RemoveListener(key string, listener
config_center.ConfigurationListener, opions ...config_center.Option) {
@@ -185,8 +186,8 @@ func (c *zookeeperDynamicConfiguration) PublishConfig(key
string, group string,
// RemoveConfig will remove the config with the (key, group) pair
func (c *zookeeperDynamicConfiguration) RemoveConfig(key string, group string)
error {
- path := c.getPath(key, group)
- err := c.client.Delete(path)
+ fullPath := c.getPath(key, group)
+ err := c.client.Delete(fullPath)
if err != nil {
return perrors.WithStack(err)
}
@@ -195,8 +196,8 @@ func (c *zookeeperDynamicConfiguration) RemoveConfig(key
string, group string) e
// GetConfigKeysByGroup will return all keys with the group
func (c *zookeeperDynamicConfiguration) GetConfigKeysByGroup(group string)
(*gxset.HashSet, error) {
- path := c.getPath("", group)
- result, err := c.client.GetChildren(path)
+ fullPath := c.getPath("", group)
+ result, err := c.client.GetChildren(fullPath)
if err != nil {
return nil, perrors.WithStack(err)
}
diff --git a/config_center/zookeeper/impl_test.go
b/config_center/zookeeper/impl_test.go
new file mode 100644
index 000000000..4fa3fa477
--- /dev/null
+++ b/config_center/zookeeper/impl_test.go
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zookeeper
+
+import (
+ "errors"
+ "testing"
+)
+
+import (
+ "github.com/dubbogo/go-zookeeper/zk"
+
+ gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
+
+ "github.com/stretchr/testify/require"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/config_center"
+)
+
+func TestBuildPath(t *testing.T) {
+ tests := []struct {
+ root string
+ sub string
+ expected string
+ }{
+ {root: "/dubbo/config", sub: "group/key", expected:
"/dubbo/config/group/key"},
+ {root: "/dubbo/config/", sub: "/group/key/", expected:
"/dubbo/config/group/key"},
+ {root: "dubbo/config", sub: "group/key", expected:
"/dubbo/config/group/key"},
+ }
+ for _, tt := range tests {
+ if got := buildPath(tt.root, tt.sub); got != tt.expected {
+ t.Fatalf("buildPath(%q,%q) = %q, want %q", tt.root,
tt.sub, got, tt.expected)
+ }
+ }
+}
+
+func TestGetPath(t *testing.T) {
+ cfg := &zookeeperDynamicConfiguration{rootPath: "/root"}
+
+ if got := cfg.getPath("k", "g"); got != "/root/g/k" {
+ t.Fatalf("getPath with group returned %q", got)
+ }
+ if got := cfg.getPath("", "g"); got != "/root/g" {
+ t.Fatalf("getPath empty key returned %q", got)
+ }
+ if got := cfg.getPath("k", ""); got !=
"/root/"+config_center.DefaultGroup+"/k" {
+ t.Fatalf("getPath default group returned %q", got)
+ }
+}
+
+func TestPublishAndRemoveConfigWithMockZk(t *testing.T) {
+ cluster, client, _, err := gxzookeeper.NewMockZookeeperClient("test",
5e9)
+ if err != nil {
+ t.Skipf("skip mock zk setup: %v", err)
+ }
+ defer cluster.Stop()
+
+ cfg := &zookeeperDynamicConfiguration{
+ rootPath: "/dubbo/config",
+ client: client,
+ done: make(chan struct{}),
+ url: mustURL(t, "registry://127.0.0.1:2181"),
+ }
+
+ err = cfg.PublishConfig("k", "g", "v1")
+ require.NoError(t, err)
+
+ content, _, err := client.GetContent("/dubbo/config/g/k")
+ require.NoError(t, err)
+ require.Equal(t, "v1", string(content))
+
+ // update existing node path
+ err = cfg.PublishConfig("k", "g", "v2")
+ require.NoError(t, err)
+ content, _, err = client.GetContent("/dubbo/config/g/k")
+ require.NoError(t, err)
+ require.Equal(t, "v2", string(content))
+
+ // remove
+ err = cfg.RemoveConfig("k", "g")
+ require.NoError(t, err)
+ _, _, err = client.GetContent("/dubbo/config/g/k")
+ require.True(t, errors.Is(err, zk.ErrNoNode))
+}
+
+func TestGetPropertiesWithMockZk(t *testing.T) {
+ cluster, client, _, err := gxzookeeper.NewMockZookeeperClient("test2",
5e9)
+ if err != nil {
+ t.Skipf("skip mock zk setup: %v", err)
+ }
+ defer cluster.Stop()
+
+ cfg := &zookeeperDynamicConfiguration{
+ rootPath: "/dubbo/config",
+ client: client,
+ done: make(chan struct{}),
+ url: mustURL(t, "registry://127.0.0.1:2181"),
+ }
+
+ require.NoError(t, cfg.PublishConfig("file.properties", "grp", "val"))
+
+ val, err := cfg.GetProperties("file.properties",
config_center.WithGroup("grp"))
+ require.NoError(t, err)
+ require.Equal(t, "val", val)
+
+ // non-existing returns empty string and nil error
+ empty, err := cfg.GetProperties("missing",
config_center.WithGroup("grp"))
+ require.NoError(t, err)
+ require.Equal(t, "", empty)
+}
+
+func mustURL(t *testing.T, raw string) *common.URL {
+ t.Helper()
+ u, err := common.NewURL(raw)
+ require.NoError(t, err)
+ return u
+}
diff --git a/config_center/zookeeper/listener_test.go
b/config_center/zookeeper/listener_test.go
new file mode 100644
index 000000000..7835ad8b4
--- /dev/null
+++ b/config_center/zookeeper/listener_test.go
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zookeeper
+
+import (
+ "testing"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/config_center"
+ "dubbo.apache.org/dubbo-go/v3/remoting"
+)
+
+type recListener struct {
+ events []*config_center.ConfigChangeEvent
+}
+
+func (r *recListener) Process(e *config_center.ConfigChangeEvent) {
+ r.events = append(r.events, e)
+}
+
+func TestCacheListenerDataChange(t *testing.T) {
+ l := &CacheListener{rootPath: "/dubbo/config"}
+ path := "/dubbo/config/group/app"
+ rec := &recListener{}
+ l.keyListeners.Store(path,
map[config_center.ConfigurationListener]struct{}{rec: {}})
+
+ ok := l.DataChange(remoting.Event{Path: path, Action:
remoting.EventTypeUpdate, Content: "val"})
+ if !ok {
+ t.Fatalf("expected listeners to be notified")
+ }
+ if len(rec.events) != 1 || rec.events[0].Value != "val" ||
rec.events[0].ConfigType != remoting.EventTypeUpdate {
+ t.Fatalf("unexpected events %+v", rec.events)
+ }
+}
+
+func TestCacheListenerDataChangeEmptyContent(t *testing.T) {
+ l := &CacheListener{rootPath: "/dubbo/config"}
+ path := "/dubbo/config/group/app"
+ rec := &recListener{}
+ l.keyListeners.Store(path,
map[config_center.ConfigurationListener]struct{}{rec: {}})
+
+ ok := l.DataChange(remoting.Event{Path: path, Action:
remoting.EventTypeAdd})
+ if !ok {
+ t.Fatalf("expected listeners to be notified")
+ }
+ if len(rec.events) != 1 || rec.events[0].ConfigType !=
remoting.EventTypeDel {
+ t.Fatalf("unexpected events %+v", rec.events)
+ }
+}
+
+func TestCacheListenerPathToKeyGroup(t *testing.T) {
+ l := &CacheListener{rootPath: "/dubbo/config"}
+ key, group := l.pathToKeyGroup("/dubbo/config/g/app")
+ if key != "app" || group != "g" {
+ t.Fatalf("unexpected key/group %s %s", key, group)
+ }
+}
+
+func TestCacheListenerRemoveListener(t *testing.T) {
+ l := &CacheListener{}
+ key := "k"
+ rec := &recListener{}
+ l.keyListeners.Store(key,
map[config_center.ConfigurationListener]struct{}{rec: {}})
+ l.RemoveListener(key, rec)
+ if m, ok := l.keyListeners.Load(key); ok {
+ if _, exists :=
m.(map[config_center.ConfigurationListener]struct{})[rec]; exists {
+ t.Fatalf("listener should be removed")
+ }
+ }
+}
diff --git a/protocol/triple/triple_protocol/triple_ext_test.go
b/protocol/triple/triple_protocol/triple_ext_test.go
index 6a08d0da6..0670748d5 100644
--- a/protocol/triple/triple_protocol/triple_ext_test.go
+++ b/protocol/triple/triple_protocol/triple_ext_test.go
@@ -1654,22 +1654,6 @@ func TestStreamForServer(t *testing.T) {
assert.NotNil(t, res)
assert.Equal(t, msg.Sum, int64(1))
})
- t.Run("client-stream-conn", func(t *testing.T) {
- t.Parallel()
- client, server := newPingServer(&pluggablePingServer{
- sum: func(ctx context.Context, stream
*triple.ClientStream) (*triple.Response, error) {
- assert.NotNil(t,
stream.Conn().Send("not-proto"))
- return
triple.NewResponse(&pingv1.SumResponse{}), nil
- },
- })
- t.Cleanup(server.Close)
- stream, err := client.Sum(context.Background())
- assert.Nil(t, err)
- assert.Nil(t, stream.Send(&pingv1.SumRequest{Number: 1}))
- res := triple.NewResponse(&pingv1.SumResponse{})
- err = stream.CloseAndReceive(res)
- assert.Nil(t, err)
- })
t.Run("client-stream-send-msg", func(t *testing.T) {
t.Parallel()
client, server := newPingServer(&pluggablePingServer{