This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/develop by this push:
new 71f0557bf feat: add unit test in remoting and metadata files (#3120)
71f0557bf is described below
commit 71f0557bfb5f9bfa7a670bf17402ec1897bc9087
Author: 陈乐樂 <[email protected]>
AuthorDate: Sat Dec 27 18:00:20 2025 +0800
feat: add unit test in remoting and metadata files (#3120)
* feat: add unit test in remoting and metadata files
---
metadata/options_test.go | 166 ++++++++++++++++
metadata/report/zookeeper/listener_test.go | 166 ++++++++++++++++
metadata/report/zookeeper/report_test.go | 126 ++++++++++++
remoting/codec_test.go | 103 ++++++++++
remoting/etcdv3/client_test.go | 81 ++++++++
remoting/etcdv3/facade_test.go | 69 +++++++
remoting/exchange_client_test.go | 106 +++++++++++
remoting/exchange_server_test.go | 103 ++++++++++
remoting/exchange_test.go | 134 +++++++++++++
remoting/getty/opentracing_test.go | 50 +++++
remoting/getty/pool_test.go | 137 +++++++++++++
remoting/listener_test.go | 102 ++++++++++
remoting/zookeeper/client_test.go | 112 +++++++++++
.../curator_discovery/service_discovery_test.go | 212 +++++++++++++++++++++
remoting/zookeeper/facade_test.go | 67 +++++++
15 files changed, 1734 insertions(+)
diff --git a/metadata/options_test.go b/metadata/options_test.go
new file mode 100644
index 000000000..a1ececf8c
--- /dev/null
+++ b/metadata/options_test.go
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package metadata
+
+import (
+ "testing"
+ "time"
+)
+
+import (
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/global"
+)
+
+func TestNewOptions(t *testing.T) {
+ // Test default options
+ opts := NewOptions()
+ assert.Equal(t, constant.DefaultMetadataStorageType, opts.metadataType)
+ assert.Equal(t, constant.DefaultProtocol, opts.protocol)
+
+ // Test with all options
+ opts = NewOptions(
+ WithAppName("my-app"),
+ WithMetadataType("remote"),
+ WithPort(20880),
+ WithMetadataProtocol("tri"),
+ )
+ assert.Equal(t, "my-app", opts.appName)
+ assert.Equal(t, "remote", opts.metadataType)
+ assert.Equal(t, 20880, opts.port)
+ assert.Equal(t, "tri", opts.protocol)
+}
+
+func TestNewReportOptions(t *testing.T) {
+ // Test default
+ opts := NewReportOptions()
+ assert.NotNil(t, opts.MetadataReportConfig)
+
+ // Test with all options
+ opts = NewReportOptions(
+ WithRegistryId("registry-1"),
+ WithZookeeper(),
+ WithAddress("127.0.0.1:2181"),
+ WithUsername("admin"),
+ WithPassword("secret"),
+ WithTimeout(5*time.Second),
+ WithGroup("test-group"),
+ WithNamespace("test-ns"),
+ WithParams(map[string]string{"key": "value"}),
+ )
+ assert.Equal(t, "registry-1", opts.registryId)
+ assert.Equal(t, constant.ZookeeperKey, opts.Protocol)
+ assert.Equal(t, "127.0.0.1:2181", opts.Address)
+ assert.Equal(t, "admin", opts.Username)
+ assert.Equal(t, "secret", opts.Password)
+ assert.Equal(t, "5000", opts.Timeout)
+ assert.Equal(t, "test-group", opts.Group)
+ assert.Equal(t, "test-ns", opts.Namespace)
+ assert.Equal(t, "value", opts.Params["key"])
+}
+
+func TestProtocolOptions(t *testing.T) {
+ tests := []struct {
+ option ReportOption
+ expected string
+ }{
+ {WithZookeeper(), constant.ZookeeperKey},
+ {WithNacos(), constant.NacosKey},
+ {WithEtcdV3(), constant.EtcdV3Key},
+ }
+ for _, tt := range tests {
+ opts := defaultReportOptions()
+ tt.option(opts)
+ assert.Equal(t, tt.expected, opts.Protocol)
+ }
+}
+
+func TestWithAddressProtocolParsing(t *testing.T) {
+ tests := []struct {
+ address, expectedProtocol string
+ }{
+ {"127.0.0.1:2181", ""},
+ {"zookeeper://127.0.0.1:2181", "zookeeper"},
+ {"nacos://localhost:8848", "nacos"},
+ }
+ for _, tt := range tests {
+ opts := defaultReportOptions()
+ WithAddress(tt.address)(opts)
+ assert.Equal(t, tt.expectedProtocol, opts.Protocol)
+ }
+}
+
+func TestReportOptionsToUrl(t *testing.T) {
+ // Valid options
+ opts := NewReportOptions(WithZookeeper(), WithAddress("127.0.0.1:2181"))
+ url, err := opts.toUrl()
+ require.NoError(t, err)
+ assert.Equal(t, "zookeeper", url.Protocol)
+
+ // Invalid options - empty protocol
+ opts = NewReportOptions(WithAddress("127.0.0.1:2181"))
+ url, err = opts.toUrl()
+ require.Error(t, err)
+ assert.Nil(t, url)
+}
+
+func TestFromRegistry(t *testing.T) {
+ rc := &global.RegistryConfig{
+ Protocol: "zookeeper",
+ Address: "127.0.0.1:2181",
+ Username: "admin",
+ Password: "secret",
+ Group: "dubbo",
+ Namespace: "public",
+ Timeout: "3s",
+ }
+ opts := fromRegistry("zk-registry", rc)
+ assert.Equal(t, "zk-registry", opts.registryId)
+ assert.Equal(t, "zookeeper", opts.Protocol)
+ assert.Equal(t, "3000", opts.Timeout)
+
+ // Invalid timeout
+ rc.Timeout = "invalid"
+ opts = fromRegistry("test", rc)
+ assert.Empty(t, opts.Timeout)
+}
+
+func TestInitRegistryMetadataReport(t *testing.T) {
+ // Empty/nil registries
+ require.NoError(t, InitRegistryMetadataReport(nil))
+ require.NoError(t,
InitRegistryMetadataReport(map[string]*global.RegistryConfig{}))
+
+ // Invalid UseAsMetaReport
+ err := InitRegistryMetadataReport(map[string]*global.RegistryConfig{
+ "zk": {Protocol: "zookeeper", Address: "127.0.0.1:2181",
UseAsMetaReport: "invalid"},
+ })
+ require.Error(t, err)
+}
+
+func TestOptionsOverride(t *testing.T) {
+ opts := NewOptions(WithAppName("app1"), WithAppName("app2"))
+ assert.Equal(t, "app2", opts.appName)
+
+ reportOpts := NewReportOptions(WithZookeeper(), WithNacos(),
WithEtcdV3())
+ assert.Equal(t, constant.EtcdV3Key, reportOpts.Protocol)
+}
diff --git a/metadata/report/zookeeper/listener_test.go
b/metadata/report/zookeeper/listener_test.go
new file mode 100644
index 000000000..8d28269ab
--- /dev/null
+++ b/metadata/report/zookeeper/listener_test.go
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zookeeper
+
+import (
+ "context"
+ "errors"
+ "sync"
+ "sync/atomic"
+ "testing"
+)
+
+import (
+ "github.com/dubbogo/gost/gof/observer"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "golang.org/x/sync/errgroup"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/metadata/mapping"
+ "dubbo.apache.org/dubbo-go/v3/remoting"
+)
+
+type mockMappingListener struct {
+ eventCount atomic.Int32
+ mu sync.Mutex
+ onEventErr error
+}
+
+func newMockMappingListener() *mockMappingListener { return
&mockMappingListener{} }
+
+func (m *mockMappingListener) OnEvent(e observer.Event) error {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.eventCount.Add(1)
+ return m.onEventErr
+}
+
+func (m *mockMappingListener) Stop() {}
+
+func (m *mockMappingListener) SetError(err error) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.onEventErr = err
+}
+
+// Ensure mockMappingListener implements mapping.MappingListener
+var _ mapping.MappingListener = (*mockMappingListener)(nil)
+
+func TestListenerSet(t *testing.T) {
+ set := NewListenerSet()
+ assert.NotNil(t, set)
+ assert.Empty(t, set.listeners)
+
+ // Add listeners
+ l1, l2 := newMockMappingListener(), newMockMappingListener()
+ set.Add(l1)
+ set.Add(l2)
+ set.Add(l1) // duplicate
+ assert.Len(t, set.listeners, 2)
+
+ // Has
+ assert.True(t, set.Has(l1))
+ assert.False(t, set.Has(newMockMappingListener()))
+
+ // Remove
+ set.Remove(l1)
+ assert.False(t, set.Has(l1))
+ assert.Len(t, set.listeners, 1)
+
+ // ForEach
+ count := 0
+ err := set.ForEach(func(l mapping.MappingListener) error { count++;
return nil })
+ require.NoError(t, err)
+ assert.Equal(t, 1, count)
+
+ // ForEach with error
+ set.Add(newMockMappingListener())
+ expectedErr := errors.New("test")
+ err = set.ForEach(func(l mapping.MappingListener) error { return
expectedErr })
+ assert.Equal(t, expectedErr, err)
+}
+
+func TestListenerSetConcurrency(t *testing.T) {
+ set := NewListenerSet()
+ g, _ := errgroup.WithContext(context.Background())
+
+ for i := 0; i < 50; i++ {
+ g.Go(func() error {
+ set.Add(newMockMappingListener())
+ return nil
+ })
+ }
+
+ if err := g.Wait(); err != nil {
+ t.Fatal(err)
+ }
+ assert.Len(t, set.listeners, 50)
+
+}
+
+func TestCacheListener(t *testing.T) {
+ cl := NewCacheListener("/dubbo", nil)
+ assert.NotNil(t, cl)
+ assert.Equal(t, "/dubbo", cl.rootPath)
+
+ // Setup listener
+ key := "/dubbo/mapping/test.service"
+ listenerSet := NewListenerSet()
+ listenerSet.Add(newMockMappingListener())
+ cl.keyListeners.Store(key, listenerSet)
+
+ // DataChange with registered listener
+ event := remoting.Event{Path: key, Action: remoting.EventTypeUpdate,
Content: "app1"}
+ assert.True(t, cl.DataChange(event))
+
+ // DataChange without registered listener
+ event.Path = "/dubbo/mapping/other"
+ assert.False(t, cl.DataChange(event))
+
+ // RemoveListener
+ ml := newMockMappingListener()
+ listenerSet.Add(ml)
+ cl.RemoveListener(key, ml)
+ listeners, _ := cl.keyListeners.Load(key)
+ assert.False(t, listeners.(*ListenerSet).Has(ml))
+}
+
+func TestCacheListenerDataChangeWithError(t *testing.T) {
+ cl := NewCacheListener("/dubbo", nil)
+ key := "/dubbo/mapping/error.service"
+
+ ml := newMockMappingListener()
+ ml.SetError(errors.New("listener error"))
+
+ listenerSet := NewListenerSet()
+ listenerSet.Add(ml)
+ cl.keyListeners.Store(key, listenerSet)
+
+ event := remoting.Event{Path: key, Action: remoting.EventTypeUpdate,
Content: "app1"}
+ assert.False(t, cl.DataChange(event))
+}
+
+func TestCacheListenerPathToKey(t *testing.T) {
+ cl := NewCacheListener("/dubbo", nil)
+ assert.Equal(t, "com.example.Service",
cl.pathToKey("/dubbo/mapping/com.example.Service"))
+ assert.Empty(t, cl.pathToKey(""))
+}
diff --git a/metadata/report/zookeeper/report_test.go
b/metadata/report/zookeeper/report_test.go
new file mode 100644
index 000000000..8bbb47f5a
--- /dev/null
+++ b/metadata/report/zookeeper/report_test.go
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zookeeper
+
+import (
+ "encoding/json"
+ "strings"
+ "testing"
+)
+
+import (
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/metadata/info"
+)
+
+func TestMetadataInfoSerialization(t *testing.T) {
+ original := &info.MetadataInfo{
+ App: "test-app",
+ Revision: "1.0.0",
+ Services: map[string]*info.ServiceInfo{
+ "com.example.TestService": {
+ Name: "com.example.TestService", Protocol:
"dubbo",
+ },
+ },
+ }
+
+ data, err := json.Marshal(original)
+ require.NoError(t, err)
+
+ var restored info.MetadataInfo
+ err = json.Unmarshal(data, &restored)
+ require.NoError(t, err)
+ assert.Equal(t, original.App, restored.App)
+ assert.Equal(t, original.Revision, restored.Revision)
+
+ // Invalid JSON
+ err = json.Unmarshal([]byte(`{invalid}`), &restored)
+ require.Error(t, err)
+}
+
+func TestRegisterServiceAppMappingValueMerge(t *testing.T) {
+ tests := []struct {
+ oldValue, newValue, expected string
+ }{
+ {"app1", "app2", "app1,app2"},
+ {"app1,app2", "app1", "app1,app2"},
+ {"", "app1", ",app1"},
+ }
+ for _, tt := range tests {
+ var result string
+ if strings.Contains(tt.oldValue, tt.newValue) {
+ result = tt.oldValue
+ } else {
+ result = tt.oldValue + "," + tt.newValue
+ }
+ assert.Equal(t, tt.expected, result)
+ }
+}
+
+func TestCreateMetadataReportURLParsing(t *testing.T) {
+ tests := []struct {
+ group, expectedRootDir string
+ }{
+ {"", "/dubbo/"},
+ {"custom", "/custom/"},
+ {"/custom", "/custom/"},
+ {"/", "/"},
+ }
+ for _, tt := range tests {
+ url := common.NewURLWithOptions(
+ common.WithProtocol("zookeeper"),
+ common.WithLocation("127.0.0.1:2181"),
+ )
+ if tt.group != "" {
+ url.SetParam(constant.MetadataReportGroupKey, tt.group)
+ }
+ rootDir := url.GetParam(constant.MetadataReportGroupKey,
"dubbo")
+ if len(rootDir) > 0 && rootDir[0] != '/' {
+ rootDir = "/" + rootDir
+ }
+ if rootDir != "/" {
+ rootDir = rootDir + "/"
+ }
+ assert.Equal(t, tt.expectedRootDir, rootDir)
+ }
+}
+
+func TestRemoveServiceAppMappingListener(t *testing.T) {
+ report := &zookeeperMetadataReport{
+ rootDir: "/dubbo/",
+ cacheListener: NewCacheListener("/dubbo/", nil),
+ }
+ err := report.RemoveServiceAppMappingListener("test.service", "mapping")
+ require.NoError(t, err)
+}
+
+func TestCacheListenerIntegrationWithReport(t *testing.T) {
+ cacheListener := NewCacheListener("/dubbo/", nil)
+ report := &zookeeperMetadataReport{
+ rootDir: "/dubbo/",
+ cacheListener: cacheListener,
+ }
+ assert.NotNil(t, report.cacheListener)
+ assert.Equal(t, "/dubbo/", report.rootDir)
+}
diff --git a/remoting/codec_test.go b/remoting/codec_test.go
new file mode 100644
index 000000000..ced9c6ca2
--- /dev/null
+++ b/remoting/codec_test.go
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package remoting
+
+import (
+ "bytes"
+ "testing"
+)
+
+import (
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+type mockCodec struct{}
+
+func (m *mockCodec) EncodeRequest(request *Request) (*bytes.Buffer, error) {
+ return bytes.NewBuffer([]byte("encoded-request")), nil
+}
+
+func (m *mockCodec) EncodeResponse(response *Response) (*bytes.Buffer, error) {
+ return bytes.NewBuffer([]byte("encoded-response")), nil
+}
+
+func (m *mockCodec) Decode(data []byte) (*DecodeResult, int, error) {
+ return &DecodeResult{IsRequest: true, Result: "decoded"}, len(data), nil
+}
+
+// Compile-time check
+var _ Codec = (*mockCodec)(nil)
+
+func TestDecodeResult(t *testing.T) {
+ dr := &DecodeResult{IsRequest: true, Result: "test-data"}
+ assert.True(t, dr.IsRequest)
+ assert.Equal(t, "test-data", dr.Result)
+
+ dr2 := &DecodeResult{IsRequest: false, Result: nil}
+ assert.False(t, dr2.IsRequest)
+ assert.Nil(t, dr2.Result)
+}
+
+func TestRegistryCodec(t *testing.T) {
+ protocol := "test-protocol"
+ mockC := &mockCodec{}
+
+ RegistryCodec(protocol, mockC)
+ retrieved := GetCodec(protocol)
+
+ assert.NotNil(t, retrieved)
+ assert.Equal(t, mockC, retrieved)
+}
+
+func TestGetCodecNotFound(t *testing.T) {
+ result := GetCodec("non-existent-protocol")
+ assert.Nil(t, result)
+}
+
+func TestRegistryCodecOverwrite(t *testing.T) {
+ protocol := "overwrite-test"
+
+ codec1 := &mockCodec{}
+ RegistryCodec(protocol, codec1)
+
+ codec2 := &mockCodec{}
+ RegistryCodec(protocol, codec2)
+
+ assert.Equal(t, codec2, GetCodec(protocol))
+}
+
+func TestCodecInterface(t *testing.T) {
+ mockC := &mockCodec{}
+
+ // EncodeRequest
+ buf, err := mockC.EncodeRequest(&Request{ID: 1, Version: "2.0.2"})
+ require.NoError(t, err)
+ assert.NotNil(t, buf)
+
+ // EncodeResponse
+ buf, err = mockC.EncodeResponse(&Response{ID: 1, Version: "2.0.2"})
+ require.NoError(t, err)
+ assert.NotNil(t, buf)
+
+ // Decode
+ result, length, err := mockC.Decode([]byte("test-data"))
+ require.NoError(t, err)
+ assert.NotNil(t, result)
+ assert.Equal(t, 9, length)
+}
diff --git a/remoting/etcdv3/client_test.go b/remoting/etcdv3/client_test.go
new file mode 100644
index 000000000..7a8250c2d
--- /dev/null
+++ b/remoting/etcdv3/client_test.go
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package etcdv3
+
+import (
+ "sync"
+ "testing"
+ "time"
+)
+
+import (
+ gxetcd "github.com/dubbogo/gost/database/kv/etcd/v3"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+)
+
+type mockClientFacade struct {
+ client *gxetcd.Client
+ lock sync.Mutex
+ url *common.URL
+ wg sync.WaitGroup
+ done chan struct{}
+}
+
+func (m *mockClientFacade) Client() *gxetcd.Client { return m.client }
+func (m *mockClientFacade) SetClient(c *gxetcd.Client) { m.client = c }
+func (m *mockClientFacade) ClientLock() *sync.Mutex { return &m.lock }
+func (m *mockClientFacade) WaitGroup() *sync.WaitGroup { return &m.wg }
+func (m *mockClientFacade) Done() chan struct{} {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+ if m.done == nil {
+ m.done = make(chan struct{})
+ }
+ return m.done
+}
+func (m *mockClientFacade) RestartCallBack() bool { return true }
+func (m *mockClientFacade) GetURL() *common.URL { return m.url }
+func (m *mockClientFacade) IsAvailable() bool { return true }
+func (m *mockClientFacade) Destroy() {}
+
+func TestValidateClient(t *testing.T) {
+ // Test with nil client (will fail without real etcd)
+ facade := &mockClientFacade{}
+ err := ValidateClient(facade,
+ gxetcd.WithName("test"),
+ gxetcd.WithEndpoints("127.0.0.1:2379"),
+ gxetcd.WithTimeout(100*time.Millisecond),
+ )
+ require.Error(t, err)
+}
+
+func TestNewServiceDiscoveryClient(t *testing.T) {
+ // Will return nil client without real etcd, but exercises the code
+ client := NewServiceDiscoveryClient(
+ gxetcd.WithName("test"),
+ gxetcd.WithEndpoints("127.0.0.1:2379"),
+ gxetcd.WithTimeout(100*time.Millisecond),
+ )
+ assert.Nil(t, client) // Expected nil without real etcd
+}
diff --git a/remoting/etcdv3/facade_test.go b/remoting/etcdv3/facade_test.go
new file mode 100644
index 000000000..309d1c780
--- /dev/null
+++ b/remoting/etcdv3/facade_test.go
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package etcdv3
+
+import (
+ "sync"
+ "testing"
+)
+
+import (
+ gxetcd "github.com/dubbogo/gost/database/kv/etcd/v3"
+
+ "github.com/stretchr/testify/assert"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+)
+
+// Verify clientFacade interface can be implemented
+type testClientFacade struct {
+ client *gxetcd.Client
+ lock sync.Mutex
+ wg sync.WaitGroup
+ done chan struct{}
+ url *common.URL
+}
+
+func (f *testClientFacade) Client() *gxetcd.Client { return f.client }
+func (f *testClientFacade) SetClient(c *gxetcd.Client) { f.client = c }
+func (f *testClientFacade) ClientLock() *sync.Mutex { return &f.lock }
+func (f *testClientFacade) WaitGroup() *sync.WaitGroup { return &f.wg }
+func (f *testClientFacade) Done() chan struct{} { return f.done }
+func (f *testClientFacade) RestartCallBack() bool { return true }
+func (f *testClientFacade) GetURL() *common.URL { return f.url }
+func (f *testClientFacade) IsAvailable() bool { return true }
+func (f *testClientFacade) Destroy() { close(f.done) }
+
+// Compile-time check
+var _ clientFacade = (*testClientFacade)(nil)
+
+func TestClientFacadeInterface(t *testing.T) {
+ facade := &testClientFacade{done: make(chan struct{})}
+
+ assert.Nil(t, facade.Client())
+ assert.NotNil(t, facade.ClientLock())
+ assert.NotNil(t, facade.WaitGroup())
+ assert.NotNil(t, facade.Done())
+ assert.True(t, facade.RestartCallBack())
+ assert.True(t, facade.IsAvailable())
+
+ facade.SetClient(nil)
+ assert.Nil(t, facade.Client())
+}
diff --git a/remoting/exchange_client_test.go b/remoting/exchange_client_test.go
new file mode 100644
index 000000000..08245e9ab
--- /dev/null
+++ b/remoting/exchange_client_test.go
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package remoting
+
+import (
+ "errors"
+ "sync"
+ "testing"
+ "time"
+)
+
+import (
+ "github.com/stretchr/testify/assert"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+)
+
+type mockClient struct {
+ mu sync.Mutex
+ available bool
+ connectErr error
+ connCount int
+}
+
+func (m *mockClient) SetExchangeClient(client *ExchangeClient) {}
+func (m *mockClient) Close() {}
+func (m *mockClient) IsAvailable() bool { m.mu.Lock();
defer m.mu.Unlock(); return m.available }
+
+func (m *mockClient) Request(request *Request, timeout time.Duration, response
*PendingResponse) error {
+ return nil
+}
+
+func (m *mockClient) Connect(url *common.URL) error {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.connCount++
+ return m.connectErr
+}
+
+func testURL() *common.URL {
+ return common.NewURLWithOptions(common.WithProtocol("dubbo"),
common.WithIp("127.0.0.1"), common.WithPort("20880"))
+}
+
+func TestNewExchangeClient(t *testing.T) {
+ t.Run("eager init", func(t *testing.T) {
+ m := &mockClient{available: true}
+ ec := NewExchangeClient(testURL(), m, 5*time.Second, false)
+ assert.NotNil(t, ec)
+ assert.Positive(t, m.connCount)
+ })
+
+ t.Run("lazy init", func(t *testing.T) {
+ m := &mockClient{available: true}
+ ec := NewExchangeClient(testURL(), m, 5*time.Second, true)
+ assert.NotNil(t, ec)
+ assert.Equal(t, 0, m.connCount)
+ })
+
+ t.Run("connect fail", func(t *testing.T) {
+ m := &mockClient{connectErr: errors.New("fail")}
+ assert.Nil(t, NewExchangeClient(testURL(), m, 5*time.Second,
false))
+ })
+}
+
+func TestExchangeClientActiveNumber(t *testing.T) {
+ ec := NewExchangeClient(testURL(), &mockClient{available: true},
5*time.Second, true)
+ assert.Equal(t, uint32(1), ec.GetActiveNumber())
+ ec.IncreaseActiveNumber()
+ assert.Equal(t, uint32(2), ec.GetActiveNumber())
+ ec.DecreaseActiveNumber()
+ assert.Equal(t, uint32(1), ec.GetActiveNumber())
+}
+
+func TestExchangeClientClose(t *testing.T) {
+ m := &mockClient{available: true}
+ ec := NewExchangeClient(testURL(), m, 5*time.Second, true)
+ ec.Close()
+ assert.False(t, ec.init)
+}
+
+func TestExchangeClientIsAvailable(t *testing.T) {
+ m := &mockClient{available: true}
+ ec := NewExchangeClient(testURL(), m, 5*time.Second, true)
+ assert.True(t, ec.IsAvailable())
+ m.mu.Lock()
+ m.available = false
+ m.mu.Unlock()
+ assert.False(t, ec.IsAvailable())
+}
diff --git a/remoting/exchange_server_test.go b/remoting/exchange_server_test.go
new file mode 100644
index 000000000..cb10c5cc0
--- /dev/null
+++ b/remoting/exchange_server_test.go
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package remoting
+
+import (
+ "sync"
+ "testing"
+)
+
+import (
+ "github.com/stretchr/testify/assert"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+)
+
+type mockServer struct {
+ mu sync.Mutex
+ startCalled int
+ stopCalled int
+}
+
+func (m *mockServer) Start() {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.startCalled++
+}
+
+func (m *mockServer) Stop() {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.stopCalled++
+}
+
+// Compile-time check
+var _ Server = (*mockServer)(nil)
+
+func TestNewExchangeServer(t *testing.T) {
+ url := common.NewURLWithOptions(
+ common.WithProtocol("dubbo"),
+ common.WithIp("127.0.0.1"),
+ common.WithPort("20880"),
+ )
+ mockS := &mockServer{}
+
+ es := NewExchangeServer(url, mockS)
+ assert.NotNil(t, es)
+ assert.Equal(t, url, es.URL)
+ assert.Equal(t, mockS, es.Server)
+}
+
+func TestExchangeServerStartStop(t *testing.T) {
+ url := common.NewURLWithOptions(
+ common.WithProtocol("dubbo"),
+ common.WithIp("127.0.0.1"),
+ common.WithPort("20880"),
+ )
+ mockS := &mockServer{}
+ es := NewExchangeServer(url, mockS)
+
+ es.Start()
+ assert.Equal(t, 1, mockS.startCalled)
+
+ es.Stop()
+ assert.Equal(t, 1, mockS.stopCalled)
+}
+
+func TestExchangeServerConcurrent(t *testing.T) {
+ url := common.NewURLWithOptions(
+ common.WithProtocol("dubbo"),
+ common.WithIp("127.0.0.1"),
+ common.WithPort("20880"),
+ )
+ mockS := &mockServer{}
+ es := NewExchangeServer(url, mockS)
+
+ var wg sync.WaitGroup
+ for i := 0; i < 50; i++ {
+ wg.Add(2)
+ go func() { defer wg.Done(); es.Start() }()
+ go func() { defer wg.Done(); es.Stop() }()
+ }
+ wg.Wait()
+
+ assert.Equal(t, 50, mockS.startCalled)
+ assert.Equal(t, 50, mockS.stopCalled)
+}
diff --git a/remoting/exchange_test.go b/remoting/exchange_test.go
new file mode 100644
index 000000000..92d7b6746
--- /dev/null
+++ b/remoting/exchange_test.go
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package remoting
+
+import (
+ "errors"
+ "sync"
+ "testing"
+ "time"
+)
+
+import (
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+)
+
+func TestSequenceID(t *testing.T) {
+ id1, id2 := SequenceID(), SequenceID()
+ assert.Equal(t, int64(2), id2-id1)
+ assert.Equal(t, int64(0), id2%2)
+}
+
+func TestSequenceIDConcurrent(t *testing.T) {
+ var wg sync.WaitGroup
+ ids := make(chan int64, 50)
+ for i := 0; i < 50; i++ {
+ wg.Add(1)
+ go func() { defer wg.Done(); ids <- SequenceID() }()
+ }
+ wg.Wait()
+ close(ids)
+
+ idSet := make(map[int64]bool)
+ for id := range ids {
+ assert.False(t, idSet[id])
+ idSet[id] = true
+ }
+}
+
+func TestNewRequest(t *testing.T) {
+ req := NewRequest("2.0.2")
+ assert.NotNil(t, req)
+ assert.Equal(t, "2.0.2", req.Version)
+ assert.NotEqual(t, int64(0), req.ID)
+}
+
+func TestNewResponse(t *testing.T) {
+ resp := NewResponse(123, "2.0.2")
+ assert.Equal(t, int64(123), resp.ID)
+ assert.Equal(t, "2.0.2", resp.Version)
+}
+
+func TestResponseIsHeartbeat(t *testing.T) {
+ assert.True(t, (&Response{Event: true, Result: nil}).IsHeartbeat())
+ assert.False(t, (&Response{Event: true, Result: "data"}).IsHeartbeat())
+ assert.False(t, (&Response{Event: false, Result: nil}).IsHeartbeat())
+}
+
+func TestResponseString(t *testing.T) {
+ resp := &Response{ID: 123, Version: "2.0.2", Error: errors.New("err")}
+ assert.Contains(t, resp.String(), "123")
+ assert.Contains(t, resp.String(), "err")
+}
+
+func TestNewPendingResponse(t *testing.T) {
+ pr := NewPendingResponse(100)
+ assert.NotNil(t, pr)
+ assert.Equal(t, int64(100), pr.seq)
+ assert.NotNil(t, pr.Done)
+}
+
+func TestPendingResponseSetResponse(t *testing.T) {
+ pr := NewPendingResponse(1)
+ resp := &Response{ID: 1, Result: "result"}
+ pr.SetResponse(resp)
+ assert.Equal(t, resp, pr.response)
+}
+
+func TestPendingResponseGetCallResponse(t *testing.T) {
+ pr := NewPendingResponse(1)
+ pr.Err = errors.New("error")
+ acr := pr.GetCallResponse().(AsyncCallbackResponse)
+ assert.Equal(t, pr.Err, acr.Cause)
+}
+
+func TestAddGetRemovePendingResponse(t *testing.T) {
+ pr := NewPendingResponse(999)
+ AddPendingResponse(pr)
+ assert.Equal(t, pr, GetPendingResponse(SequenceType(999)))
+ assert.Equal(t, pr, removePendingResponse(SequenceType(999)))
+ assert.Nil(t, removePendingResponse(SequenceType(999)))
+}
+
+func TestResponseHandle(t *testing.T) {
+ t.Run("with callback", func(t *testing.T) {
+ pr := NewPendingResponse(777)
+ called := false
+ pr.Callback = func(response common.CallbackResponse) { called =
true }
+ AddPendingResponse(pr)
+ (&Response{ID: 777}).Handle()
+ assert.True(t, called)
+ })
+
+ t.Run("without callback", func(t *testing.T) {
+ pr := NewPendingResponse(666)
+ AddPendingResponse(pr)
+ (&Response{ID: 666, Error: errors.New("err")}).Handle()
+ select {
+ case <-pr.Done:
+ case <-time.After(100 * time.Millisecond):
+ t.Fatal("Done should be closed")
+ }
+ require.Error(t, pr.Err)
+ })
+}
diff --git a/remoting/getty/opentracing_test.go
b/remoting/getty/opentracing_test.go
new file mode 100644
index 000000000..78dcaf578
--- /dev/null
+++ b/remoting/getty/opentracing_test.go
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package getty
+
+import (
+ "testing"
+)
+
+import (
+ "github.com/stretchr/testify/assert"
+)
+
+func TestFilterContext(t *testing.T) {
+ attachments := map[string]any{
+ "string-key": "string-value",
+ "int-key": 123,
+ "bool-key": true,
+ }
+
+ result := filterContext(attachments)
+
+ assert.Len(t, result, 1)
+ assert.Equal(t, "string-value", result["string-key"])
+}
+
+func TestFillTraceAttachments(t *testing.T) {
+ attachments := map[string]any{"existing": "value"}
+ traceAttachment := map[string]string{"trace-id": "123", "span-id":
"456"}
+
+ fillTraceAttachments(attachments, traceAttachment)
+
+ assert.Equal(t, "value", attachments["existing"])
+ assert.Equal(t, "123", attachments["trace-id"])
+ assert.Equal(t, "456", attachments["span-id"])
+}
diff --git a/remoting/getty/pool_test.go b/remoting/getty/pool_test.go
new file mode 100644
index 000000000..2bb8c5a4b
--- /dev/null
+++ b/remoting/getty/pool_test.go
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package getty
+
+import (
+ "sync"
+ "sync/atomic"
+ "testing"
+)
+
+import (
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestGettyRPCClientUpdateActive(t *testing.T) {
+ client := &gettyRPCClient{}
+ client.updateActive(1234567890)
+ assert.Equal(t, int64(1234567890), atomic.LoadInt64(&client.active))
+
+ client.updateActive(0)
+ assert.Equal(t, int64(0), atomic.LoadInt64(&client.active))
+}
+
+func TestGettyRPCClientSelectSession(t *testing.T) {
+ client := &gettyRPCClient{sessions: nil}
+ assert.Nil(t, client.selectSession())
+
+ client.sessions = []*rpcSession{}
+ assert.Nil(t, client.selectSession())
+}
+
+func TestGettyRPCClientSessionOperations(t *testing.T) {
+ client := &gettyRPCClient{}
+
+ // Remove/update nil session should not panic
+ client.removeSession(nil)
+ client.updateSession(nil)
+
+ // Get from nil sessions
+ _, err := client.getClientRpcSession(nil)
+ assert.Equal(t, errClientClosed, err)
+
+ // Session not found
+ client.sessions = []*rpcSession{}
+ _, err = client.getClientRpcSession(nil)
+ assert.Contains(t, err.Error(), "session not exist")
+}
+
+func TestGettyRPCClientIsAvailable(t *testing.T) {
+ client := &gettyRPCClient{sessions: nil}
+ assert.False(t, client.isAvailable())
+
+ client.sessions = []*rpcSession{}
+ assert.False(t, client.isAvailable())
+}
+
+func TestGettyRPCClientClose(t *testing.T) {
+ client := &gettyRPCClient{sessions: []*rpcSession{}}
+ require.NoError(t, client.close())
+ err := client.close()
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "close gettyRPCClient")
+ assert.Contains(t, err.Error(), "again")
+}
+
+func TestGettyRPCClientConcurrent(t *testing.T) {
+ client := &gettyRPCClient{sessions: []*rpcSession{}}
+ var wg sync.WaitGroup
+
+ for i := 0; i < 50; i++ {
+ wg.Add(1)
+ go func(val int64) {
+ defer wg.Done()
+ client.updateActive(val)
+ _ = client.selectSession()
+ _ = client.isAvailable()
+ }(int64(i))
+ }
+ wg.Wait()
+}
+
+func TestRpcSession(t *testing.T) {
+ s := &rpcSession{reqNum: 0}
+
+ s.AddReqNum(5)
+ assert.Equal(t, int32(5), s.GetReqNum())
+
+ s.AddReqNum(3)
+ assert.Equal(t, int32(8), s.GetReqNum())
+
+ s.AddReqNum(-3)
+ assert.Equal(t, int32(5), s.GetReqNum())
+}
+
+func TestRpcSessionConcurrent(t *testing.T) {
+ s := &rpcSession{}
+ var wg sync.WaitGroup
+
+ for i := 0; i < 100; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ s.AddReqNum(1)
+ }()
+ }
+ wg.Wait()
+ assert.Equal(t, int32(100), s.GetReqNum())
+}
+
+func TestGettyRPCClientLifecycle(t *testing.T) {
+ client := &gettyRPCClient{addr: "127.0.0.1:20880", sessions:
[]*rpcSession{}}
+
+ assert.False(t, client.isAvailable())
+ assert.Equal(t, int64(0), atomic.LoadInt64(&client.active))
+
+ client.updateActive(1234567890)
+ assert.Equal(t, int64(1234567890), atomic.LoadInt64(&client.active))
+
+ require.NoError(t, client.close())
+ assert.Equal(t, int64(0), atomic.LoadInt64(&client.active))
+}
diff --git a/remoting/listener_test.go b/remoting/listener_test.go
new file mode 100644
index 000000000..f4b4ea440
--- /dev/null
+++ b/remoting/listener_test.go
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package remoting
+
+import (
+ "sync"
+ "testing"
+)
+
+import (
+ "github.com/stretchr/testify/assert"
+)
+
+const (
+ testPath = "/dubbo/services/test"
+ testContent = "test-content"
+)
+
+type mockDataListener struct {
+ mu sync.Mutex
+ events []Event
+}
+
+func (m *mockDataListener) DataChange(event Event) bool {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.events = append(m.events, event)
+ return true
+}
+
+// Compile-time check
+var _ DataListener = (*mockDataListener)(nil)
+
+func TestEventType(t *testing.T) {
+ assert.Equal(t, "add", EventTypeAdd.String())
+ assert.Equal(t, "delete", EventTypeDel.String())
+ assert.Equal(t, "update", EventTypeUpdate.String())
+
+ assert.Equal(t, EventTypeAdd, EventType(0))
+ assert.Equal(t, EventTypeDel, EventType(1))
+ assert.Equal(t, EventTypeUpdate, EventType(2))
+}
+
+func TestEvent(t *testing.T) {
+ event := Event{Path: testPath, Action: EventTypeAdd, Content:
testContent}
+
+ assert.Equal(t, testPath, event.Path)
+ assert.Equal(t, EventTypeAdd, event.Action)
+ assert.Equal(t, testContent, event.Content)
+
+ str := event.String()
+ assert.Contains(t, str, "add")
+ assert.Contains(t, str, testContent)
+}
+
+func TestDataListener(t *testing.T) {
+ listener := &mockDataListener{}
+
+ events := []Event{
+ {Path: testPath, Action: EventTypeAdd, Content: testContent},
+ {Path: testPath, Action: EventTypeDel, Content: ""},
+ {Path: testPath, Action: EventTypeUpdate, Content: "updated"},
+ }
+
+ for _, event := range events {
+ assert.True(t, listener.DataChange(event))
+ }
+
+ assert.Len(t, listener.events, 3)
+ assert.Equal(t, events, listener.events)
+}
+
+func TestDataListenerConcurrent(t *testing.T) {
+ listener := &mockDataListener{}
+ var wg sync.WaitGroup
+
+ for i := 0; i < 100; i++ {
+ wg.Add(1)
+ go func(idx int) {
+ defer wg.Done()
+ listener.DataChange(Event{Path: testPath, Action:
EventType(idx % 3), Content: testContent})
+ }(i)
+ }
+
+ wg.Wait()
+ assert.Len(t, listener.events, 100)
+}
diff --git a/remoting/zookeeper/client_test.go
b/remoting/zookeeper/client_test.go
new file mode 100644
index 000000000..dd20abd58
--- /dev/null
+++ b/remoting/zookeeper/client_test.go
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zookeeper
+
+import (
+ "sync"
+ "testing"
+ "time"
+)
+
+import (
+ gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+)
+
+func TestConstants(t *testing.T) {
+ assert.Equal(t, 3, ConnDelay)
+ assert.Equal(t, 3, MaxFailTimes)
+}
+
+// Mock for ValidateZookeeperClient test
+type mockZkClientFacade struct {
+ client *gxzookeeper.ZookeeperClient
+ lock sync.Mutex
+ url *common.URL
+ done chan struct{}
+}
+
+func (m *mockZkClientFacade) ZkClient() *gxzookeeper.ZookeeperClient {
return m.client }
+func (m *mockZkClientFacade) SetZkClient(c *gxzookeeper.ZookeeperClient) {
m.client = c }
+func (m *mockZkClientFacade) ZkClientLock() *sync.Mutex {
return &m.lock }
+func (m *mockZkClientFacade) WaitGroup() *sync.WaitGroup {
return &sync.WaitGroup{} }
+func (m *mockZkClientFacade) Done() chan struct{} {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+ if m.done == nil {
+ m.done = make(chan struct{})
+ }
+ return m.done
+}
+func (m *mockZkClientFacade) RestartCallBack() bool { return true }
+func (m *mockZkClientFacade) GetURL() *common.URL { return m.url }
+
+func TestValidateZookeeperClient(t *testing.T) {
+ // Test with invalid address (will fail to connect but exercises the
code path)
+ facade := &mockZkClientFacade{
+ url: common.NewURLWithOptions(
+ common.WithParamsValue("config.timeout", "100ms"),
+ ),
+ }
+
+ err := ValidateZookeeperClient(facade, "test")
+ require.Error(t, err) // Expected to fail without real zk
+
+ // Test with existing client (should skip creation)
+ facade2 := &mockZkClientFacade{
+ client: &gxzookeeper.ZookeeperClient{},
+ url: common.NewURLWithOptions(),
+ }
+ err = ValidateZookeeperClient(facade2, "test")
+ require.NoError(t, err)
+}
+
+func TestValidateZookeeperClientConcurrent(t *testing.T) {
+ facade := &mockZkClientFacade{
+ url: common.NewURLWithOptions(
+ common.WithParamsValue("config.timeout", "50ms"),
+ ),
+ }
+
+ var wg sync.WaitGroup
+ for i := 0; i < 5; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ ValidateZookeeperClient(facade, "concurrent-test")
+ }()
+ }
+
+ done := make(chan struct{})
+ go func() {
+ wg.Wait()
+ close(done)
+ }()
+
+ select {
+ case <-done:
+ case <-time.After(5 * time.Second):
+ t.Fatal("concurrent test timeout")
+ }
+}
diff --git a/remoting/zookeeper/curator_discovery/service_discovery_test.go
b/remoting/zookeeper/curator_discovery/service_discovery_test.go
new file mode 100644
index 000000000..8ea262e00
--- /dev/null
+++ b/remoting/zookeeper/curator_discovery/service_discovery_test.go
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package curator_discovery
+
+import (
+ "fmt"
+ "sync"
+ "testing"
+)
+
+import (
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/remoting"
+)
+
+const (
+ testBasePath = "/dubbo/services"
+ testServiceName = "test-service"
+ testInstanceID = "instance-1"
+)
+
+func TestEntry(t *testing.T) {
+ t.Run("basic operations", func(t *testing.T) {
+ entry := &Entry{}
+ assert.Nil(t, entry.instance)
+
+ entry.Lock()
+ entry.instance = &ServiceInstance{Name: testServiceName}
+ entry.Unlock()
+ assert.Equal(t, testServiceName, entry.instance.Name)
+ })
+
+ t.Run("concurrent access", func(t *testing.T) {
+ entry := &Entry{instance: &ServiceInstance{Name:
testServiceName}}
+ var wg sync.WaitGroup
+
+ for i := 0; i < 100; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ entry.Lock()
+ entry.instance.Name = "updated"
+ entry.Unlock()
+ }()
+ }
+ wg.Wait()
+ assert.Equal(t, "updated", entry.instance.Name)
+ })
+}
+
+func TestNewServiceDiscovery(t *testing.T) {
+ for _, basePath := range []string{testBasePath, "/", ""} {
+ sd := NewServiceDiscovery(nil, basePath)
+ assert.NotNil(t, sd)
+ assert.Equal(t, basePath, sd.basePath)
+ assert.NotNil(t, sd.mutex)
+ assert.NotNil(t, sd.services)
+ }
+}
+
+func TestServiceDiscoveryPathForInstance(t *testing.T) {
+ tests := []struct {
+ basePath, serviceName, instanceID, expected string
+ }{
+ {testBasePath, testServiceName, testInstanceID, testBasePath +
"/" + testServiceName + "/" + testInstanceID},
+ {"/", "service", "id", "/service/id"},
+ {"", "service", "id", "service/id"},
+ }
+
+ for _, tt := range tests {
+ sd := NewServiceDiscovery(nil, tt.basePath)
+ assert.Equal(t, tt.expected, sd.pathForInstance(tt.serviceName,
tt.instanceID))
+ }
+}
+
+func TestServiceDiscoveryPathForName(t *testing.T) {
+ tests := []struct {
+ basePath, serviceName, expected string
+ }{
+ {testBasePath, testServiceName, testBasePath + "/" +
testServiceName},
+ {"/", "service", "/service"},
+ {"", "service", "service"},
+ }
+
+ for _, tt := range tests {
+ sd := NewServiceDiscovery(nil, tt.basePath)
+ assert.Equal(t, tt.expected, sd.pathForName(tt.serviceName))
+ }
+}
+
+func TestServiceDiscoveryGetNameAndID(t *testing.T) {
+ tests := []struct {
+ name, basePath, path, expectedName, expectedID string
+ wantErr bool
+ }{
+ {"valid path", testBasePath, testBasePath + "/" +
testServiceName + "/" + testInstanceID, testServiceName, testInstanceID, false},
+ {"missing id", "/dubbo", "/dubbo/service", "", "", true},
+ {"empty path", "/dubbo", "", "", "", true},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ sd := NewServiceDiscovery(nil, tt.basePath)
+ name, id, err := sd.getNameAndID(tt.path)
+ if tt.wantErr {
+ require.Error(t, err)
+ } else {
+ require.NoError(t, err)
+ assert.Equal(t, tt.expectedName, name)
+ assert.Equal(t, tt.expectedID, id)
+ }
+ })
+ }
+}
+
+func TestServiceDiscoveryDataChange(t *testing.T) {
+ sd := NewServiceDiscovery(nil, testBasePath)
+ testPath := testBasePath + "/test/" + testInstanceID
+
+ for _, eventType := range []remoting.EventType{remoting.EventTypeAdd,
remoting.EventTypeUpdate, remoting.EventTypeDel} {
+ event := remoting.Event{Path: testPath, Action: eventType,
Content: "content"}
+ assert.True(t, sd.DataChange(event))
+ }
+
+ // Invalid path
+ assert.True(t, sd.DataChange(remoting.Event{Path: testBasePath +
"/only-name"}))
+}
+
+func TestServiceDiscoveryClose(t *testing.T) {
+ sd := &ServiceDiscovery{client: nil, listener: nil, services:
&sync.Map{}, mutex: &sync.Mutex{}}
+ sd.Close() // Should not panic
+}
+
+func TestServiceDiscoveryServicesMap(t *testing.T) {
+ sd := NewServiceDiscovery(nil, testBasePath)
+
+ entry := &Entry{instance: &ServiceInstance{Name: testServiceName, ID:
testInstanceID}}
+ sd.services.Store(testInstanceID, entry)
+
+ value, ok := sd.services.Load(testInstanceID)
+ assert.True(t, ok)
+ assert.Equal(t, testServiceName, value.(*Entry).instance.Name)
+
+ sd.services.Delete(testInstanceID)
+ _, ok = sd.services.Load(testInstanceID)
+ assert.False(t, ok)
+}
+
+func TestServiceDiscoveryUpdateService(t *testing.T) {
+ sd := NewServiceDiscovery(nil, testBasePath)
+
+ // Update non-existent
+ err := sd.UpdateService(&ServiceInstance{Name: testServiceName, ID:
"non-existent"})
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "not registered")
+
+ // Update with invalid entry type
+ sd.services.Store("invalid-id", "not-an-entry")
+ err = sd.UpdateService(&ServiceInstance{Name: testServiceName, ID:
"invalid-id"})
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "not entry")
+}
+
+func TestServiceDiscoveryUnregisterService(t *testing.T) {
+ sd := NewServiceDiscovery(nil, testBasePath)
+
+ err := sd.UnregisterService(&ServiceInstance{Name: testServiceName, ID:
"non-existent"})
+ require.NoError(t, err)
+}
+
+func TestServiceDiscoveryConcurrentAccess(t *testing.T) {
+ sd := NewServiceDiscovery(nil, testBasePath)
+ var wg sync.WaitGroup
+
+ for i := 0; i < 100; i++ {
+ wg.Add(1)
+ go func(idx int) {
+ defer wg.Done()
+ //Confirm ID is equal
+ entry := &Entry{instance: &ServiceInstance{Name:
testServiceName, ID: fmt.Sprintf("id-%d", idx)}}
+ sd.services.Store(entry.instance.ID, entry)
+ }(i)
+ }
+ wg.Wait()
+
+ // test whether 100 datas is inserted
+ count := 0
+ sd.services.Range(func(_, _ any) bool {
+ count++
+ return true
+ })
+ assert.Equal(t, 100, count)
+}
diff --git a/remoting/zookeeper/facade_test.go
b/remoting/zookeeper/facade_test.go
new file mode 100644
index 000000000..45ca97d57
--- /dev/null
+++ b/remoting/zookeeper/facade_test.go
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zookeeper
+
+import (
+ "sync"
+ "testing"
+)
+
+import (
+ gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
+
+ "github.com/stretchr/testify/assert"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+)
+
+// Verify ZkClientFacade interface can be implemented
+type testZkClientFacade struct {
+ client *gxzookeeper.ZookeeperClient
+ lock sync.Mutex
+ wg sync.WaitGroup
+ done chan struct{}
+ url *common.URL
+}
+
+func (f *testZkClientFacade) ZkClient() *gxzookeeper.ZookeeperClient {
return f.client }
+func (f *testZkClientFacade) SetZkClient(c *gxzookeeper.ZookeeperClient) {
f.client = c }
+func (f *testZkClientFacade) ZkClientLock() *sync.Mutex {
return &f.lock }
+func (f *testZkClientFacade) WaitGroup() *sync.WaitGroup {
return &f.wg }
+func (f *testZkClientFacade) Done() chan struct{} {
return f.done }
+func (f *testZkClientFacade) RestartCallBack() bool {
return true }
+func (f *testZkClientFacade) GetURL() *common.URL {
return f.url }
+
+// Compile-time check that testZkClientFacade implements ZkClientFacade
+var _ ZkClientFacade = (*testZkClientFacade)(nil)
+
+func TestZkClientFacadeInterface(t *testing.T) {
+ facade := &testZkClientFacade{done: make(chan struct{})}
+
+ assert.Nil(t, facade.ZkClient())
+ assert.NotNil(t, facade.ZkClientLock())
+ assert.NotNil(t, facade.WaitGroup())
+ assert.NotNil(t, facade.Done())
+ assert.True(t, facade.RestartCallBack())
+ assert.Nil(t, facade.GetURL())
+
+ facade.SetZkClient(nil)
+ assert.Nil(t, facade.ZkClient())
+}