This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/develop by this push:
     new d0a90bc41 Add unit test of config center (#3134)
d0a90bc41 is described below

commit d0a90bc4120d05a39bdf109e4c41d13bbb14daad
Author: Sirui Huang <[email protected]>
AuthorDate: Sat Dec 27 15:56:12 2025 +0800

    Add unit test of config center (#3134)
    
    * add the unit test
---
 config_center/apollo/listener_test.go              |  72 +++++++++++
 config_center/file/impl_test.go                    |   2 +-
 config_center/file/listener.go                     |  60 ++++++---
 config_center/file/listener_test.go                |  87 +++++++++++++
 config_center/nacos/listener_test.go               |  66 ++++++++++
 config_center/options_test.go                      |  88 ++++++++++++++
 config_center/parser/configuration_parser_test.go  |  39 ++++++
 config_center/zookeeper/impl.go                    |  19 +--
 config_center/zookeeper/impl_test.go               | 135 +++++++++++++++++++++
 config_center/zookeeper/listener_test.go           |  86 +++++++++++++
 protocol/triple/triple_protocol/triple_ext_test.go |  16 ---
 11 files changed, 629 insertions(+), 41 deletions(-)

diff --git a/config_center/apollo/listener_test.go 
b/config_center/apollo/listener_test.go
new file mode 100644
index 000000000..06b1fab43
--- /dev/null
+++ b/config_center/apollo/listener_test.go
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package apollo
+
+import (
+       "testing"
+)
+
+import (
+       "github.com/apolloconfig/agollo/v4/storage"
+)
+
+import (
+       "dubbo.apache.org/dubbo-go/v3/config_center"
+       "dubbo.apache.org/dubbo-go/v3/remoting"
+)
+
+type recordingListener struct {
+       events []*config_center.ConfigChangeEvent
+}
+
+func (r *recordingListener) Process(e *config_center.ConfigChangeEvent) {
+       r.events = append(r.events, e)
+}
+
+func TestApolloListener(t *testing.T) {
+       l := newApolloListener()
+       rec := &recordingListener{}
+
+       // add/remove idempotent
+       l.AddListener(rec)
+       l.AddListener(rec)
+       l.RemoveListener(rec)
+       l.AddListener(rec)
+
+       change := &storage.FullChangeEvent{
+               Changes: map[string]any{"k": "v"},
+       }
+       change.Namespace = "application"
+       l.OnNewestChange(change)
+
+       if len(rec.events) != 1 {
+               t.Fatalf("expected 1 event, got %d", len(rec.events))
+       }
+       ev := rec.events[0]
+       if ev.Key != "application" || ev.ConfigType != remoting.EventTypeUpdate 
{
+               t.Fatalf("unexpected event %+v", ev)
+       }
+       if l.IsEmpty() {
+               t.Fatalf("listener should not be empty after add")
+       }
+
+       l.RemoveListener(rec)
+       if !l.IsEmpty() {
+               t.Fatalf("listener should be empty after remove")
+       }
+}
diff --git a/config_center/file/impl_test.go b/config_center/file/impl_test.go
index db60c3081..9696fd83b 100644
--- a/config_center/file/impl_test.go
+++ b/config_center/file/impl_test.go
@@ -35,7 +35,7 @@ import (
 )
 
 const (
-       key = "com.dubbo.go"
+       key = "com_dubbo_go"
 )
 
 func initFileData(t *testing.T) (*FileSystemDynamicConfiguration, error) {
diff --git a/config_center/file/listener.go b/config_center/file/listener.go
index 60c7d82aa..5f491d043 100644
--- a/config_center/file/listener.go
+++ b/config_center/file/listener.go
@@ -38,6 +38,7 @@ import (
 type CacheListener struct {
        watch        *fsnotify.Watcher
        keyListeners sync.Map
+       contentCache sync.Map
        rootPath     string
 }
 
@@ -52,28 +53,51 @@ func NewCacheListener(rootPath string) *CacheListener {
        go func() {
                for {
                        select {
-                       case event := <-watch.Events:
+                       case event, ok := <-watch.Events:
+                               if !ok {
+                                       return
+                               }
                                key := event.Name
+                               if key == "" {
+                                       continue
+                               }
                                logger.Debugf("watcher %s, event %v", 
cl.rootPath, event)
+                               if event.Op&fsnotify.Remove == fsnotify.Remove {
+                                       cl.contentCache.Delete(key)
+                                       if l, ok := cl.keyListeners.Load(key); 
ok {
+                                               
removeCallback(l.(map[config_center.ConfigurationListener]struct{}), key, 
remoting.EventTypeDel)
+                                       }
+                               }
                                if event.Op&fsnotify.Write == fsnotify.Write {
+                                       content := getFileContent(key)
+                                       if prev, ok := 
cl.contentCache.Load(key); ok {
+                                               if prevStr, ok := 
prev.(string); ok && prevStr == content {
+                                                       continue
+                                               }
+                                       }
+                                       cl.contentCache.Store(key, content)
                                        if l, ok := cl.keyListeners.Load(key); 
ok {
-                                               
dataChangeCallback(l.(map[config_center.ConfigurationListener]struct{}), key,
+                                               
dataChangeCallback(l.(map[config_center.ConfigurationListener]struct{}), key, 
content,
                                                        
remoting.EventTypeUpdate)
                                        }
                                }
                                if event.Op&fsnotify.Create == fsnotify.Create {
+                                       content := getFileContent(key)
+                                       if prev, ok := 
cl.contentCache.Load(key); ok {
+                                               if prevStr, ok := 
prev.(string); ok && prevStr == content {
+                                                       continue
+                                               }
+                                       }
+                                       cl.contentCache.Store(key, content)
                                        if l, ok := cl.keyListeners.Load(key); 
ok {
-                                               
dataChangeCallback(l.(map[config_center.ConfigurationListener]struct{}), key,
+                                               
dataChangeCallback(l.(map[config_center.ConfigurationListener]struct{}), key, 
content,
                                                        remoting.EventTypeAdd)
                                        }
                                }
-                               if event.Op&fsnotify.Remove == fsnotify.Remove {
-                                       if l, ok := cl.keyListeners.Load(key); 
ok {
-                                               
removeCallback(l.(map[config_center.ConfigurationListener]struct{}), key, 
remoting.EventTypeDel)
-                                       }
+                       case err, ok := <-watch.Errors:
+                               if !ok {
+                                       return
                                }
-                       case err := <-watch.Errors:
-                               // err may be nil, ignore
                                if err != nil {
                                        logger.Warnf("file : listen watch 
fail:%+v", err)
                                }
@@ -99,14 +123,13 @@ func removeCallback(lmap 
map[config_center.ConfigurationListener]struct{}, key s
        }
 }
 
-func dataChangeCallback(lmap map[config_center.ConfigurationListener]struct{}, 
key string, event remoting.EventType) {
+func dataChangeCallback(lmap map[config_center.ConfigurationListener]struct{}, 
key, content string, event remoting.EventType) {
        if len(lmap) == 0 {
                logger.Warnf("file watch callback but configuration listener is 
empty, key:%s, event:%v", key, event)
                return
        }
-       c := getFileContent(key)
        for l := range lmap {
-               callback(l, key, c, event)
+               callback(l, key, content, event)
        }
 }
 
@@ -147,10 +170,17 @@ func (cl *CacheListener) RemoveListener(key string, 
listener config_center.Confi
        if !loaded {
                return
        }
-       delete(listeners.(map[config_center.ConfigurationListener]struct{}), 
listener)
-       if err := cl.watch.Remove(key); err != nil {
-               logger.Errorf("watcher remove path:%s err:%v", key, err)
+       lmap := listeners.(map[config_center.ConfigurationListener]struct{})
+       delete(lmap, listener)
+       if len(lmap) == 0 {
+               cl.keyListeners.Delete(key)
+               cl.contentCache.Delete(key)
+               if err := cl.watch.Remove(key); err != nil {
+                       logger.Errorf("watcher remove path:%s err:%v", key, err)
+               }
+               return
        }
+       cl.keyListeners.Store(key, lmap)
 }
 
 func getFileContent(path string) string {
diff --git a/config_center/file/listener_test.go 
b/config_center/file/listener_test.go
new file mode 100644
index 000000000..624e35a5b
--- /dev/null
+++ b/config_center/file/listener_test.go
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package file
+
+import (
+       "os"
+       "path/filepath"
+       "testing"
+       "time"
+)
+
+import (
+       "dubbo.apache.org/dubbo-go/v3/config_center"
+       "dubbo.apache.org/dubbo-go/v3/remoting"
+)
+
+type recListener struct {
+       ch chan *config_center.ConfigChangeEvent
+}
+
+func (r *recListener) Process(e *config_center.ConfigChangeEvent) {
+       select {
+       case r.ch <- e:
+       default:
+       }
+}
+
+func TestCacheListenerCallbacks(t *testing.T) {
+       dir := t.TempDir()
+       filePath := filepath.Join(dir, "cfg.txt")
+       if err := os.WriteFile(filePath, []byte("init"), 0o644); err != nil {
+               t.Fatalf("write file error %v", err)
+       }
+
+       cl := NewCacheListener(dir)
+
+       rec := &recListener{ch: make(chan *config_center.ConfigChangeEvent, 4)}
+       cl.AddListener(filePath, rec)
+
+       // update event
+       if err := os.WriteFile(filePath, []byte("update"), 0o644); err != nil {
+               t.Fatalf("write file error %v", err)
+       }
+       waitEvent(t, rec.ch, remoting.EventTypeUpdate)
+
+       // remove listener then cleanup
+       cl.RemoveListener(filePath, rec)
+       if err := cl.Close(); err != nil {
+               t.Fatalf("close watcher error %v", err)
+       }
+       if err := os.Remove(filePath); err != nil {
+               t.Fatalf("remove file error %v", err)
+       }
+       select {
+       case <-rec.ch:
+               // should not receive after removal
+               t.Fatalf("unexpected event after remove")
+       case <-time.After(200 * time.Millisecond):
+       }
+}
+
+func waitEvent(t *testing.T, ch <-chan *config_center.ConfigChangeEvent, 
expect remoting.EventType) {
+       t.Helper()
+       select {
+       case ev := <-ch:
+               if ev.ConfigType != expect {
+                       t.Fatalf("expected %v, got %v", expect, ev.ConfigType)
+               }
+       case <-time.After(2 * time.Second):
+               t.Fatalf("timeout waiting for event %v", expect)
+       }
+}
diff --git a/config_center/nacos/listener_test.go 
b/config_center/nacos/listener_test.go
new file mode 100644
index 000000000..00ceb7f1d
--- /dev/null
+++ b/config_center/nacos/listener_test.go
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package nacos
+
+import (
+       "sync"
+       "testing"
+)
+
+import (
+       "dubbo.apache.org/dubbo-go/v3/config_center"
+       "dubbo.apache.org/dubbo-go/v3/remoting"
+)
+
+type recordingListener struct {
+       events []*config_center.ConfigChangeEvent
+}
+
+func (r *recordingListener) Process(e *config_center.ConfigChangeEvent) {
+       r.events = append(r.events, e)
+}
+
+func TestCallback(t *testing.T) {
+       l := &recordingListener{}
+       var m sync.Map
+       m.Store(l, struct{}{})
+
+       callback(&m, "", "g", "data", "payload")
+
+       if len(l.events) != 1 {
+               t.Fatalf("expected 1 event, got %d", len(l.events))
+       }
+       if l.events[0].Key != "data" || l.events[0].Value != "payload" || 
l.events[0].ConfigType != remoting.EventTypeUpdate {
+               t.Fatalf("unexpected event %+v", l.events[0])
+       }
+}
+
+func TestRemoveListener(t *testing.T) {
+       n := &nacosDynamicConfiguration{}
+       key := "k"
+       l := &recordingListener{}
+       inner := &sync.Map{}
+       inner.Store(l, struct{}{})
+       n.keyListeners.Store(key, inner)
+
+       n.removeListener(key, l)
+
+       if _, ok := inner.Load(l); ok {
+               t.Fatalf("listener should be removed")
+       }
+}
diff --git a/config_center/options_test.go b/config_center/options_test.go
new file mode 100644
index 000000000..6a7ea5ab7
--- /dev/null
+++ b/config_center/options_test.go
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package config_center
+
+import (
+       "testing"
+       "time"
+)
+
+import (
+       "github.com/stretchr/testify/assert"
+)
+
+import (
+       "dubbo.apache.org/dubbo-go/v3/common/constant"
+       "dubbo.apache.org/dubbo-go/v3/common/constant/file"
+       "dubbo.apache.org/dubbo-go/v3/global"
+)
+
+func TestOptionsProtocolAndAddress(t *testing.T) {
+       cases := []struct {
+               name     string
+               opts     []Option
+               expected string
+       }{
+               {name: "zookeeper", opts: []Option{WithZookeeper()}, expected: 
constant.ZookeeperKey},
+               {name: "nacos", opts: []Option{WithNacos()}, expected: 
constant.NacosKey},
+               {name: "apollo", opts: []Option{WithApollo()}, expected: 
constant.ApolloKey},
+               {name: "custom", opts: []Option{WithConfigCenter("etcd")}, 
expected: "etcd"},
+               {name: "address with scheme", opts: 
[]Option{WithAddress("zk://127.0.0.1:2181")}, expected: "zk"},
+       }
+
+       for _, tt := range cases {
+               t.Run(tt.name, func(t *testing.T) {
+                       co := NewOptions(tt.opts...)
+                       assert.Equal(t, tt.expected, co.Center.Protocol)
+               })
+       }
+}
+
+func TestOptionsFields(t *testing.T) {
+       center := &global.CenterConfig{}
+       opts := []Option{
+               WithDataID("dataId"),
+               WithCluster("cluster"),
+               WithGroup("group"),
+               WithUsername("user"),
+               WithPassword("pwd"),
+               WithNamespace("ns"),
+               WithAppID("app"),
+               WithTimeout(1500 * time.Millisecond),
+               WithParams(map[string]string{"k": "v"}),
+               WithFile(),
+               WithFileExtJson(),
+       }
+
+       wrapped := &Options{Center: center}
+       for _, o := range opts {
+               o(wrapped)
+       }
+
+       assert.Equal(t, "dataId", center.DataId)
+       assert.Equal(t, "cluster", center.Cluster)
+       assert.Equal(t, "group", center.Group)
+       assert.Equal(t, "user", center.Username)
+       assert.Equal(t, "pwd", center.Password)
+       assert.Equal(t, "ns", center.Namespace)
+       assert.Equal(t, "app", center.AppID)
+       assert.Equal(t, "1500", center.Timeout)
+       assert.Equal(t, map[string]string{"k": "v"}, center.Params)
+       assert.Equal(t, constant.FileKey, center.Protocol)
+       assert.Equal(t, string(file.JSON), center.FileExtension)
+}
diff --git a/config_center/parser/configuration_parser_test.go 
b/config_center/parser/configuration_parser_test.go
index be2d45b25..ffffc8463 100644
--- a/config_center/parser/configuration_parser_test.go
+++ b/config_center/parser/configuration_parser_test.go
@@ -60,6 +60,34 @@ configs:
        assert.Equal(t, "0.0.0.0", urls[0].Location)
 }
 
+func TestDefaultConfigurationParserAppScopeDefaults(t *testing.T) {
+       parser := &DefaultConfigurationParser{}
+       content := `configVersion: 3.0.0
+scope: application
+key: app-key
+enabled: true
+configs:
+- type: custom
+  enabled: false
+  addresses: []
+  providerAddresses: []
+  services: []
+  applications: []
+  parameters:
+    mock: v
+  side: consumer`
+       urls, err := parser.ParseToUrls(content)
+       assert.NoError(t, err)
+       assert.Equal(t, 1, len(urls))
+       assert.Equal(t, "override", urls[0].Protocol)
+       assert.Equal(t, "0.0.0.0", urls[0].Location)
+       assert.Equal(t, "*", urls[0].Service())
+       assert.Equal(t, "app-key", urls[0].GetParam("application", ""))
+       assert.Equal(t, "dynamicconfigurators", urls[0].GetParam("category", 
""))
+       assert.Equal(t, "3.0.0", urls[0].GetParam("configVersion", ""))
+       assert.Equal(t, "false", urls[0].GetParam("enabled", ""))
+}
+
 func TestDefaultConfigurationParserServiceItemToUrls_ParserToUrls(t 
*testing.T) {
        parser := &DefaultConfigurationParser{}
        content := `configVersion: 2.7.1
@@ -87,3 +115,14 @@ configs:
        assert.Equal(t, "override", urls[0].Protocol)
        assert.Equal(t, "0.0.0.0", urls[0].Location)
 }
+
+func TestGetEnabledString(t *testing.T) {
+       item := ConfigItem{Enabled: false}
+       cfg := ConfiguratorConfig{Enabled: true}
+       // when type empty/general use config.enabled
+       assert.Equal(t, "&enabled=true", getEnabledString(item, cfg))
+
+       item.Type = "custom"
+       item.Enabled = false
+       assert.Equal(t, "&enabled=false", getEnabledString(item, cfg))
+}
diff --git a/config_center/zookeeper/impl.go b/config_center/zookeeper/impl.go
index a0f2efd4a..1544706cf 100644
--- a/config_center/zookeeper/impl.go
+++ b/config_center/zookeeper/impl.go
@@ -20,6 +20,7 @@ package zookeeper
 import (
        "encoding/base64"
        "errors"
+       "path"
        "strconv"
        "strings"
        "sync"
@@ -111,12 +112,12 @@ func (c *zookeeperDynamicConfiguration) AddListener(key 
string, listener config_
 
 // buildPath build path and format
 func buildPath(rootPath, subPath string) string {
-       path := strings.TrimRight(rootPath+pathSeparator+subPath, pathSeparator)
-       if !strings.HasPrefix(path, pathSeparator) {
-               path = pathSeparator + path
+       fullPath := strings.TrimRight(rootPath+pathSeparator+subPath, 
pathSeparator)
+       if !strings.HasPrefix(fullPath, pathSeparator) {
+               fullPath = pathSeparator + fullPath
        }
-       path = strings.ReplaceAll(path, "//", "/")
-       return path
+
+       return path.Clean(fullPath)
 }
 
 func (c *zookeeperDynamicConfiguration) RemoveListener(key string, listener 
config_center.ConfigurationListener, opions ...config_center.Option) {
@@ -185,8 +186,8 @@ func (c *zookeeperDynamicConfiguration) PublishConfig(key 
string, group string,
 
 // RemoveConfig will remove the config with the (key, group) pair
 func (c *zookeeperDynamicConfiguration) RemoveConfig(key string, group string) 
error {
-       path := c.getPath(key, group)
-       err := c.client.Delete(path)
+       fullPath := c.getPath(key, group)
+       err := c.client.Delete(fullPath)
        if err != nil {
                return perrors.WithStack(err)
        }
@@ -195,8 +196,8 @@ func (c *zookeeperDynamicConfiguration) RemoveConfig(key 
string, group string) e
 
 // GetConfigKeysByGroup will return all keys with the group
 func (c *zookeeperDynamicConfiguration) GetConfigKeysByGroup(group string) 
(*gxset.HashSet, error) {
-       path := c.getPath("", group)
-       result, err := c.client.GetChildren(path)
+       fullPath := c.getPath("", group)
+       result, err := c.client.GetChildren(fullPath)
        if err != nil {
                return nil, perrors.WithStack(err)
        }
diff --git a/config_center/zookeeper/impl_test.go 
b/config_center/zookeeper/impl_test.go
new file mode 100644
index 000000000..4fa3fa477
--- /dev/null
+++ b/config_center/zookeeper/impl_test.go
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zookeeper
+
+import (
+       "errors"
+       "testing"
+)
+
+import (
+       "github.com/dubbogo/go-zookeeper/zk"
+
+       gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
+
+       "github.com/stretchr/testify/require"
+)
+
+import (
+       "dubbo.apache.org/dubbo-go/v3/common"
+       "dubbo.apache.org/dubbo-go/v3/config_center"
+)
+
+func TestBuildPath(t *testing.T) {
+       tests := []struct {
+               root     string
+               sub      string
+               expected string
+       }{
+               {root: "/dubbo/config", sub: "group/key", expected: 
"/dubbo/config/group/key"},
+               {root: "/dubbo/config/", sub: "/group/key/", expected: 
"/dubbo/config/group/key"},
+               {root: "dubbo/config", sub: "group/key", expected: 
"/dubbo/config/group/key"},
+       }
+       for _, tt := range tests {
+               if got := buildPath(tt.root, tt.sub); got != tt.expected {
+                       t.Fatalf("buildPath(%q,%q) = %q, want %q", tt.root, 
tt.sub, got, tt.expected)
+               }
+       }
+}
+
+func TestGetPath(t *testing.T) {
+       cfg := &zookeeperDynamicConfiguration{rootPath: "/root"}
+
+       if got := cfg.getPath("k", "g"); got != "/root/g/k" {
+               t.Fatalf("getPath with group returned %q", got)
+       }
+       if got := cfg.getPath("", "g"); got != "/root/g" {
+               t.Fatalf("getPath empty key returned %q", got)
+       }
+       if got := cfg.getPath("k", ""); got != 
"/root/"+config_center.DefaultGroup+"/k" {
+               t.Fatalf("getPath default group returned %q", got)
+       }
+}
+
+func TestPublishAndRemoveConfigWithMockZk(t *testing.T) {
+       cluster, client, _, err := gxzookeeper.NewMockZookeeperClient("test", 
5e9)
+       if err != nil {
+               t.Skipf("skip mock zk setup: %v", err)
+       }
+       defer cluster.Stop()
+
+       cfg := &zookeeperDynamicConfiguration{
+               rootPath: "/dubbo/config",
+               client:   client,
+               done:     make(chan struct{}),
+               url:      mustURL(t, "registry://127.0.0.1:2181"),
+       }
+
+       err = cfg.PublishConfig("k", "g", "v1")
+       require.NoError(t, err)
+
+       content, _, err := client.GetContent("/dubbo/config/g/k")
+       require.NoError(t, err)
+       require.Equal(t, "v1", string(content))
+
+       // update existing node path
+       err = cfg.PublishConfig("k", "g", "v2")
+       require.NoError(t, err)
+       content, _, err = client.GetContent("/dubbo/config/g/k")
+       require.NoError(t, err)
+       require.Equal(t, "v2", string(content))
+
+       // remove
+       err = cfg.RemoveConfig("k", "g")
+       require.NoError(t, err)
+       _, _, err = client.GetContent("/dubbo/config/g/k")
+       require.True(t, errors.Is(err, zk.ErrNoNode))
+}
+
+func TestGetPropertiesWithMockZk(t *testing.T) {
+       cluster, client, _, err := gxzookeeper.NewMockZookeeperClient("test2", 
5e9)
+       if err != nil {
+               t.Skipf("skip mock zk setup: %v", err)
+       }
+       defer cluster.Stop()
+
+       cfg := &zookeeperDynamicConfiguration{
+               rootPath: "/dubbo/config",
+               client:   client,
+               done:     make(chan struct{}),
+               url:      mustURL(t, "registry://127.0.0.1:2181"),
+       }
+
+       require.NoError(t, cfg.PublishConfig("file.properties", "grp", "val"))
+
+       val, err := cfg.GetProperties("file.properties", 
config_center.WithGroup("grp"))
+       require.NoError(t, err)
+       require.Equal(t, "val", val)
+
+       // non-existing returns empty string and nil error
+       empty, err := cfg.GetProperties("missing", 
config_center.WithGroup("grp"))
+       require.NoError(t, err)
+       require.Equal(t, "", empty)
+}
+
+func mustURL(t *testing.T, raw string) *common.URL {
+       t.Helper()
+       u, err := common.NewURL(raw)
+       require.NoError(t, err)
+       return u
+}
diff --git a/config_center/zookeeper/listener_test.go 
b/config_center/zookeeper/listener_test.go
new file mode 100644
index 000000000..7835ad8b4
--- /dev/null
+++ b/config_center/zookeeper/listener_test.go
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zookeeper
+
+import (
+       "testing"
+)
+
+import (
+       "dubbo.apache.org/dubbo-go/v3/config_center"
+       "dubbo.apache.org/dubbo-go/v3/remoting"
+)
+
+type recListener struct {
+       events []*config_center.ConfigChangeEvent
+}
+
+func (r *recListener) Process(e *config_center.ConfigChangeEvent) {
+       r.events = append(r.events, e)
+}
+
+func TestCacheListenerDataChange(t *testing.T) {
+       l := &CacheListener{rootPath: "/dubbo/config"}
+       path := "/dubbo/config/group/app"
+       rec := &recListener{}
+       l.keyListeners.Store(path, 
map[config_center.ConfigurationListener]struct{}{rec: {}})
+
+       ok := l.DataChange(remoting.Event{Path: path, Action: 
remoting.EventTypeUpdate, Content: "val"})
+       if !ok {
+               t.Fatalf("expected listeners to be notified")
+       }
+       if len(rec.events) != 1 || rec.events[0].Value != "val" || 
rec.events[0].ConfigType != remoting.EventTypeUpdate {
+               t.Fatalf("unexpected events %+v", rec.events)
+       }
+}
+
+func TestCacheListenerDataChangeEmptyContent(t *testing.T) {
+       l := &CacheListener{rootPath: "/dubbo/config"}
+       path := "/dubbo/config/group/app"
+       rec := &recListener{}
+       l.keyListeners.Store(path, 
map[config_center.ConfigurationListener]struct{}{rec: {}})
+
+       ok := l.DataChange(remoting.Event{Path: path, Action: 
remoting.EventTypeAdd})
+       if !ok {
+               t.Fatalf("expected listeners to be notified")
+       }
+       if len(rec.events) != 1 || rec.events[0].ConfigType != 
remoting.EventTypeDel {
+               t.Fatalf("unexpected events %+v", rec.events)
+       }
+}
+
+func TestCacheListenerPathToKeyGroup(t *testing.T) {
+       l := &CacheListener{rootPath: "/dubbo/config"}
+       key, group := l.pathToKeyGroup("/dubbo/config/g/app")
+       if key != "app" || group != "g" {
+               t.Fatalf("unexpected key/group %s %s", key, group)
+       }
+}
+
+func TestCacheListenerRemoveListener(t *testing.T) {
+       l := &CacheListener{}
+       key := "k"
+       rec := &recListener{}
+       l.keyListeners.Store(key, 
map[config_center.ConfigurationListener]struct{}{rec: {}})
+       l.RemoveListener(key, rec)
+       if m, ok := l.keyListeners.Load(key); ok {
+               if _, exists := 
m.(map[config_center.ConfigurationListener]struct{})[rec]; exists {
+                       t.Fatalf("listener should be removed")
+               }
+       }
+}
diff --git a/protocol/triple/triple_protocol/triple_ext_test.go 
b/protocol/triple/triple_protocol/triple_ext_test.go
index 6a08d0da6..0670748d5 100644
--- a/protocol/triple/triple_protocol/triple_ext_test.go
+++ b/protocol/triple/triple_protocol/triple_ext_test.go
@@ -1654,22 +1654,6 @@ func TestStreamForServer(t *testing.T) {
                assert.NotNil(t, res)
                assert.Equal(t, msg.Sum, int64(1))
        })
-       t.Run("client-stream-conn", func(t *testing.T) {
-               t.Parallel()
-               client, server := newPingServer(&pluggablePingServer{
-                       sum: func(ctx context.Context, stream 
*triple.ClientStream) (*triple.Response, error) {
-                               assert.NotNil(t, 
stream.Conn().Send("not-proto"))
-                               return 
triple.NewResponse(&pingv1.SumResponse{}), nil
-                       },
-               })
-               t.Cleanup(server.Close)
-               stream, err := client.Sum(context.Background())
-               assert.Nil(t, err)
-               assert.Nil(t, stream.Send(&pingv1.SumRequest{Number: 1}))
-               res := triple.NewResponse(&pingv1.SumResponse{})
-               err = stream.CloseAndReceive(res)
-               assert.Nil(t, err)
-       })
        t.Run("client-stream-send-msg", func(t *testing.T) {
                t.Parallel()
                client, server := newPingServer(&pluggablePingServer{

Reply via email to