This is an automated email from the ASF dual-hosted git repository.
xuetaoli pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git
The following commit(s) were added to refs/heads/develop by this push:
new 8b9df288 fix(test): resolve data race in nacos registry and session
manager tests (#789)
8b9df288 is described below
commit 8b9df28852a2b4d951056c75b8bed482d02da6da
Author: dubbo-go-bot <[email protected]>
AuthorDate: Sun Nov 2 21:42:51 2025 +0800
fix(test): resolve data race in nacos registry and session manager tests
(#789)
* chore: remove unnecessary pull_request_target event from GitHub Actions
* fix(test): resolve data race in nacos registry and session manager tests
---------
Co-authored-by: Young <[email protected]>
---
.../mcpserver/registry/nacos/client_test.go | 69 +++++++++++++++++-----
.../mcp/mcpserver/transport/session_manager.go | 16 ++++-
.../mcpserver/transport/session_manager_test.go | 14 ++++-
pkg/filter/mcp/mcpserver/transport/sse_handler.go | 4 ++
4 files changed, 83 insertions(+), 20 deletions(-)
diff --git a/pkg/adapter/mcpserver/registry/nacos/client_test.go
b/pkg/adapter/mcpserver/registry/nacos/client_test.go
index 2b89760d..5a74e74f 100644
--- a/pkg/adapter/mcpserver/registry/nacos/client_test.go
+++ b/pkg/adapter/mcpserver/registry/nacos/client_test.go
@@ -22,6 +22,7 @@ import (
"regexp"
"sort"
"strings"
+ "sync"
"testing"
"time"
)
@@ -116,9 +117,10 @@ func createBrokenJSON() string {
type MockedNacosConfigClient struct {
configs map[string]any
configListenerMap map[string][]func(string, string, string, string)
+ mu sync.RWMutex // Protects configListenerMap
}
-func (m MockedNacosConfigClient) GetConfig(param vo.ConfigParam) (string,
error) {
+func (m *MockedNacosConfigClient) GetConfig(param vo.ConfigParam) (string,
error) {
if result, exist := m.configs[param.DataId+"$$"+param.Group]; exist {
config, ok := result.(string)
if ok {
@@ -135,17 +137,20 @@ func (m MockedNacosConfigClient) GetConfig(param
vo.ConfigParam) (string, error)
return "", nil
}
-func (m MockedNacosConfigClient) PublishConfig(_ vo.ConfigParam) (bool, error)
{
+func (m *MockedNacosConfigClient) PublishConfig(_ vo.ConfigParam) (bool,
error) {
//TODO implement me
panic("implement me")
}
-func (m MockedNacosConfigClient) DeleteConfig(_ vo.ConfigParam) (bool, error) {
+func (m *MockedNacosConfigClient) DeleteConfig(_ vo.ConfigParam) (bool, error)
{
//TODO implement me
panic("implement me")
}
-func (m MockedNacosConfigClient) ListenConfig(params vo.ConfigParam) (err
error) {
+func (m *MockedNacosConfigClient) ListenConfig(params vo.ConfigParam) (err
error) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
if _, ok := m.configListenerMap[params.Group]; !ok {
m.configListenerMap[params.Group] = []func(string, string,
string, string){}
}
@@ -153,12 +158,15 @@ func (m MockedNacosConfigClient) ListenConfig(params
vo.ConfigParam) (err error)
return nil
}
-func (m MockedNacosConfigClient) CancelListenConfig(params vo.ConfigParam)
(err error) {
+func (m *MockedNacosConfigClient) CancelListenConfig(params vo.ConfigParam)
(err error) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
delete(m.configListenerMap, params.DataId+"$$"+params.Group)
return nil
}
-func (m MockedNacosConfigClient) SearchConfig(param vo.SearchConfigParam)
(*model.ConfigPage, error) {
+func (m *MockedNacosConfigClient) SearchConfig(param vo.SearchConfigParam)
(*model.ConfigPage, error) {
dataIdRegex := strings.ReplaceAll(param.DataId, "*", ".*")
groupRegex := strings.ReplaceAll(param.Group, "*", ".*")
result := []model.ConfigItem{}
@@ -194,7 +202,7 @@ func (m MockedNacosConfigClient) SearchConfig(param
vo.SearchConfigParam) (*mode
}, nil
}
-func (m MockedNacosConfigClient) CloseClient() {
+func (m *MockedNacosConfigClient) CloseClient() {
//TODO implement me
panic("implement me")
}
@@ -288,7 +296,7 @@ func TestNacosRegistryClient_ListMcpServer(t *testing.T) {
}
client := NacosRegistryClient{
- configClient: MockedNacosConfigClient{configs: mockedConfigs},
+ configClient: &MockedNacosConfigClient{configs: mockedConfigs},
}
servers, err := client.ListMcpServer()
@@ -334,7 +342,7 @@ func TestNacosRegistryClient_ListenToMcpServer(t
*testing.T) {
serverConfigKey113 := fmt.Sprintf("%s-%s-mcp-server.json%smcp-server",
testMcpServerID, testVersion113, configKeySeparator)
toolsConfigKey113 := fmt.Sprintf("%s-%s-mcp-tools.json%smcp-tools",
testMcpServerID, testVersion113, configKeySeparator)
- configClient := MockedNacosConfigClient{
+ configClient := &MockedNacosConfigClient{
configs: map[string]any{
versionConfigKey:
createExploreServerVersionConfig(testVersion112),
serverConfigKey112:
createMcpServerConfig(testMcpServerID, testVersion112, testServiceName),
@@ -366,15 +374,24 @@ func TestNacosRegistryClient_ListenToMcpServer(t
*testing.T) {
// Set up listener for configuration changes
var newConfig *McpServerConfig
+ var configMutex sync.Mutex
err = client.ListenToMcpServer(testMcpServerID, func(info
*McpServerConfig) {
+ configMutex.Lock()
newConfig = info
+ configMutex.Unlock()
})
if err != nil {
t.Fatalf("Failed to start listening to MCP server: %v", err)
}
// Wait for initial configuration to be loaded
- for i := 0; i < testRetryMaxAttempts && newConfig == nil; i++ {
+ for i := 0; i < testRetryMaxAttempts; i++ {
+ configMutex.Lock()
+ cfg := newConfig
+ configMutex.Unlock()
+ if cfg != nil {
+ break
+ }
time.Sleep(testRetryInterval)
}
@@ -384,19 +401,26 @@ func TestNacosRegistryClient_ListenToMcpServer(t
*testing.T) {
// Replace nacos template with processed version
expectedToolsConfig = strings.ReplaceAll(expectedToolsConfig,
fmt.Sprintf("${nacos.%s/%s}", testConfigKey, testConfigKey),
fmt.Sprintf(".config.credentials.%s", testCredentialKey))
+ configMutex.Lock()
assert.Equal(t, expectedServerConfig, newConfig.ServerSpecConfig)
assert.Equal(t, expectedToolsConfig, newConfig.ToolsSpecConfig)
assert.Equal(t, 1, len(newConfig.Credentials))
assert.Equal(t, map[string]any{"key": testSecretKey},
newConfig.Credentials[testCredentialKey])
+ configMutex.Unlock()
// Test case 1: Change tool nacos template reference
+ configClient.mu.RLock()
listener := configClient.configListenerMap[toolsConfigKey112][0]
+ configClient.mu.RUnlock()
updatedToolsConfig := createMcpToolsConfig(fmt.Sprintf("%s/%s",
testConfigKey1, testConfigKey1))
listener(testNamespace, "mcp-tools", toolsConfigKey112,
updatedToolsConfig)
// Wait for tools update to propagate
for i := 0; i < testRetryMaxAttempts; i++ {
- if newConfig != nil &&
strings.Contains(newConfig.ToolsSpecConfig, testCredentialKey1) {
+ configMutex.Lock()
+ cfg := newConfig
+ configMutex.Unlock()
+ if cfg != nil && strings.Contains(cfg.ToolsSpecConfig,
testCredentialKey1) {
break
}
time.Sleep(testRetryInterval)
@@ -404,30 +428,42 @@ func TestNacosRegistryClient_ListenToMcpServer(t
*testing.T) {
// Verify updated tools configuration
expectedUpdatedToolsConfig := strings.ReplaceAll(updatedToolsConfig,
fmt.Sprintf("${nacos.%s/%s}", testConfigKey1, testConfigKey1),
fmt.Sprintf(".config.credentials.%s", testCredentialKey1))
+ configMutex.Lock()
assert.Equal(t, expectedUpdatedToolsConfig, newConfig.ToolsSpecConfig)
assert.Equal(t, 1, len(newConfig.Credentials))
assert.Equal(t, map[string]any{"key": testSecretKey1},
newConfig.Credentials[testCredentialKey1])
+ configMutex.Unlock()
// Test case 2: Change backend service name
+ configClient.mu.RLock()
serviceListener := configClient.configListenerMap[serverConfigKey112][0]
+ configClient.mu.RUnlock()
updatedServerConfig := createMcpServerConfig(testMcpServerID,
testVersion112, testServiceNameNew)
serviceListener(testNamespace, "mcp-server", serverConfigKey112,
updatedServerConfig)
for i := 0; i < testRetryMaxAttempts; i++ {
- if newConfig != nil &&
strings.Contains(newConfig.ServerSpecConfig, testServiceNameNew) {
+ configMutex.Lock()
+ cfg := newConfig
+ configMutex.Unlock()
+ if cfg != nil && strings.Contains(cfg.ServerSpecConfig,
testServiceNameNew) {
break
}
time.Sleep(testRetryInterval)
}
// Test case 3: Publish new version of MCP server
+ configClient.mu.RLock()
versionListener := configClient.configListenerMap[versionConfigKey][0]
+ configClient.mu.RUnlock()
updatedVersionConfig := createExploreServerVersionConfig(testVersion113)
versionListener(testNamespace, testGroupNameMcpVersions,
versionConfigKey, updatedVersionConfig)
// Wait for version update to trigger server config change
for i := 0; i < testRetryMaxAttempts; i++ {
- if newConfig != nil &&
strings.Contains(newConfig.ServerSpecConfig, fmt.Sprintf("\"version\":\"%s\"",
testVersion113)) {
+ configMutex.Lock()
+ cfg := newConfig
+ configMutex.Unlock()
+ if cfg != nil && strings.Contains(cfg.ServerSpecConfig,
fmt.Sprintf("\"version\":\"%s\"", testVersion113)) {
break
}
time.Sleep(testRetryInterval)
@@ -435,7 +471,10 @@ func TestNacosRegistryClient_ListenToMcpServer(t
*testing.T) {
// Wait for tools config to update to new version reference
for i := 0; i < testRetryMaxAttempts; i++ {
- if newConfig != nil &&
strings.Contains(newConfig.ToolsSpecConfig, testCredentialKey3) {
+ configMutex.Lock()
+ cfg := newConfig
+ configMutex.Unlock()
+ if cfg != nil && strings.Contains(cfg.ToolsSpecConfig,
testCredentialKey3) {
break
}
time.Sleep(testRetryInterval)
@@ -446,8 +485,10 @@ func TestNacosRegistryClient_ListenToMcpServer(t
*testing.T) {
expectedFinalToolsConfig := createMcpToolsConfig(fmt.Sprintf("%s/%s",
testConfigKey3, testConfigKey3))
expectedFinalToolsConfig = strings.ReplaceAll(expectedFinalToolsConfig,
fmt.Sprintf("${nacos.%s/%s}", testConfigKey3, testConfigKey3),
fmt.Sprintf(".config.credentials.%s", testCredentialKey3))
+ configMutex.Lock()
assert.Equal(t, expectedFinalServerConfig, newConfig.ServerSpecConfig)
assert.Equal(t, expectedFinalToolsConfig, newConfig.ToolsSpecConfig)
assert.Equal(t, 1, len(newConfig.Credentials))
assert.Equal(t, map[string]any{"key": testSecretKey3},
newConfig.Credentials[testCredentialKey3])
+ configMutex.Unlock()
}
diff --git a/pkg/filter/mcp/mcpserver/transport/session_manager.go
b/pkg/filter/mcp/mcpserver/transport/session_manager.go
index 1eee222e..ddc8eda4 100644
--- a/pkg/filter/mcp/mcpserver/transport/session_manager.go
+++ b/pkg/filter/mcp/mcpserver/transport/session_manager.go
@@ -42,6 +42,7 @@ type MCPSession struct {
LastActivity time.Time
PipeWriter *io.PipeWriter // Pipe writer for sending SSE messages
Done chan struct{}
+ mu sync.RWMutex // Protects LastActivity field
}
// SessionManager manages MCP sessions for SSE connections
@@ -70,7 +71,10 @@ func (sm *SessionManager) EnsureSession(sessionIDHeader
string) (*MCPSession, bo
// Try to get existing session
if sessionIDHeader != "" {
if session, exists := sm.sessions[sessionIDHeader]; exists {
+ // Update LastActivity using session's own mutex
+ session.mu.Lock()
session.LastActivity = time.Now()
+ session.mu.Unlock()
return session, false // existing session
}
}
@@ -92,11 +96,13 @@ func (sm *SessionManager) EnsureSession(sessionIDHeader
string) (*MCPSession, bo
// Session retrieves a session by ID
func (sm *SessionManager) Session(sessionID string) (*MCPSession, bool) {
sm.mu.RLock()
- defer sm.mu.RUnlock()
-
session, exists := sm.sessions[sessionID]
+ sm.mu.RUnlock()
+
if exists {
+ session.mu.Lock()
session.LastActivity = time.Now()
+ session.mu.Unlock()
}
return session, exists
}
@@ -168,7 +174,11 @@ func (sm *SessionManager) cleanupExpiredSessions() {
var toRemove []string
for sessionID, session := range sm.sessions {
- if now.Sub(session.LastActivity) > SessionTimeout {
+ session.mu.RLock()
+ lastActivity := session.LastActivity
+ session.mu.RUnlock()
+
+ if now.Sub(lastActivity) > SessionTimeout {
toRemove = append(toRemove, sessionID)
}
}
diff --git a/pkg/filter/mcp/mcpserver/transport/session_manager_test.go
b/pkg/filter/mcp/mcpserver/transport/session_manager_test.go
index 93acc5c4..4b888d66 100644
--- a/pkg/filter/mcp/mcpserver/transport/session_manager_test.go
+++ b/pkg/filter/mcp/mcpserver/transport/session_manager_test.go
@@ -79,7 +79,15 @@ func TestEnsureSession_ReuseExisting(t *testing.T) {
}
// Verify last activity was updated
- if session2.LastActivity.Before(session1.LastActivity) {
+ session1.mu.RLock()
+ lastActivity1 := session1.LastActivity
+ session1.mu.RUnlock()
+
+ session2.mu.RLock()
+ lastActivity2 := session2.LastActivity
+ session2.mu.RUnlock()
+
+ if lastActivity2.Before(lastActivity1) {
t.Error("LastActivity should be updated")
}
}
@@ -152,9 +160,9 @@ func TestSessionCleanup(t *testing.T) {
sessionID := session.ID
// Manually set old LastActivity to simulate timeout
- sm.mu.Lock()
+ session.mu.Lock()
session.LastActivity = time.Now().Add(-SessionTimeout - 1*time.Minute)
- sm.mu.Unlock()
+ session.mu.Unlock()
// Trigger cleanup
sm.cleanupExpiredSessions()
diff --git a/pkg/filter/mcp/mcpserver/transport/sse_handler.go
b/pkg/filter/mcp/mcpserver/transport/sse_handler.go
index 166a9f95..70a3329d 100644
--- a/pkg/filter/mcp/mcpserver/transport/sse_handler.go
+++ b/pkg/filter/mcp/mcpserver/transport/sse_handler.go
@@ -60,7 +60,11 @@ func (h *SSEHandler) SendSSEMessage(session *MCPSession,
message any) error {
return fmt.Errorf("failed to write to SSE pipe: %w", err)
}
+ // Update LastActivity using session's mutex
+ session.mu.Lock()
session.LastActivity = time.Now()
+ session.mu.Unlock()
+
logger.Debugf("[dubbo-go-pixiu] mcp server sent SSE message to session:
%s", session.ID)
return nil
}