This is an automated email from the ASF dual-hosted git repository.

xuetaoli pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git


The following commit(s) were added to refs/heads/develop by this push:
     new 8b9df288 fix(test): resolve data race in nacos registry and session 
manager tests (#789)
8b9df288 is described below

commit 8b9df28852a2b4d951056c75b8bed482d02da6da
Author: dubbo-go-bot <[email protected]>
AuthorDate: Sun Nov 2 21:42:51 2025 +0800

    fix(test): resolve data race in nacos registry and session manager tests 
(#789)
    
    * chore: remove unnecessary pull_request_target event from GitHub Actions
    
    * fix(test): resolve data race in nacos registry and session manager tests
    
    ---------
    
    Co-authored-by: Young <[email protected]>
---
 .../mcpserver/registry/nacos/client_test.go        | 69 +++++++++++++++++-----
 .../mcp/mcpserver/transport/session_manager.go     | 16 ++++-
 .../mcpserver/transport/session_manager_test.go    | 14 ++++-
 pkg/filter/mcp/mcpserver/transport/sse_handler.go  |  4 ++
 4 files changed, 83 insertions(+), 20 deletions(-)

diff --git a/pkg/adapter/mcpserver/registry/nacos/client_test.go 
b/pkg/adapter/mcpserver/registry/nacos/client_test.go
index 2b89760d..5a74e74f 100644
--- a/pkg/adapter/mcpserver/registry/nacos/client_test.go
+++ b/pkg/adapter/mcpserver/registry/nacos/client_test.go
@@ -22,6 +22,7 @@ import (
        "regexp"
        "sort"
        "strings"
+       "sync"
        "testing"
        "time"
 )
@@ -116,9 +117,10 @@ func createBrokenJSON() string {
 type MockedNacosConfigClient struct {
        configs           map[string]any
        configListenerMap map[string][]func(string, string, string, string)
+       mu                sync.RWMutex // Protects configListenerMap
 }
 
-func (m MockedNacosConfigClient) GetConfig(param vo.ConfigParam) (string, 
error) {
+func (m *MockedNacosConfigClient) GetConfig(param vo.ConfigParam) (string, 
error) {
        if result, exist := m.configs[param.DataId+"$$"+param.Group]; exist {
                config, ok := result.(string)
                if ok {
@@ -135,17 +137,20 @@ func (m MockedNacosConfigClient) GetConfig(param 
vo.ConfigParam) (string, error)
        return "", nil
 }
 
-func (m MockedNacosConfigClient) PublishConfig(_ vo.ConfigParam) (bool, error) 
{
+func (m *MockedNacosConfigClient) PublishConfig(_ vo.ConfigParam) (bool, 
error) {
        //TODO implement me
        panic("implement me")
 }
 
-func (m MockedNacosConfigClient) DeleteConfig(_ vo.ConfigParam) (bool, error) {
+func (m *MockedNacosConfigClient) DeleteConfig(_ vo.ConfigParam) (bool, error) 
{
        //TODO implement me
        panic("implement me")
 }
 
-func (m MockedNacosConfigClient) ListenConfig(params vo.ConfigParam) (err 
error) {
+func (m *MockedNacosConfigClient) ListenConfig(params vo.ConfigParam) (err 
error) {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+
        if _, ok := m.configListenerMap[params.Group]; !ok {
                m.configListenerMap[params.Group] = []func(string, string, 
string, string){}
        }
@@ -153,12 +158,15 @@ func (m MockedNacosConfigClient) ListenConfig(params 
vo.ConfigParam) (err error)
        return nil
 }
 
-func (m MockedNacosConfigClient) CancelListenConfig(params vo.ConfigParam) 
(err error) {
+func (m *MockedNacosConfigClient) CancelListenConfig(params vo.ConfigParam) 
(err error) {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+
        delete(m.configListenerMap, params.DataId+"$$"+params.Group)
        return nil
 }
 
-func (m MockedNacosConfigClient) SearchConfig(param vo.SearchConfigParam) 
(*model.ConfigPage, error) {
+func (m *MockedNacosConfigClient) SearchConfig(param vo.SearchConfigParam) 
(*model.ConfigPage, error) {
        dataIdRegex := strings.ReplaceAll(param.DataId, "*", ".*")
        groupRegex := strings.ReplaceAll(param.Group, "*", ".*")
        result := []model.ConfigItem{}
@@ -194,7 +202,7 @@ func (m MockedNacosConfigClient) SearchConfig(param 
vo.SearchConfigParam) (*mode
        }, nil
 }
 
-func (m MockedNacosConfigClient) CloseClient() {
+func (m *MockedNacosConfigClient) CloseClient() {
        //TODO implement me
        panic("implement me")
 }
@@ -288,7 +296,7 @@ func TestNacosRegistryClient_ListMcpServer(t *testing.T) {
        }
 
        client := NacosRegistryClient{
-               configClient: MockedNacosConfigClient{configs: mockedConfigs},
+               configClient: &MockedNacosConfigClient{configs: mockedConfigs},
        }
 
        servers, err := client.ListMcpServer()
@@ -334,7 +342,7 @@ func TestNacosRegistryClient_ListenToMcpServer(t 
*testing.T) {
        serverConfigKey113 := fmt.Sprintf("%s-%s-mcp-server.json%smcp-server", 
testMcpServerID, testVersion113, configKeySeparator)
        toolsConfigKey113 := fmt.Sprintf("%s-%s-mcp-tools.json%smcp-tools", 
testMcpServerID, testVersion113, configKeySeparator)
 
-       configClient := MockedNacosConfigClient{
+       configClient := &MockedNacosConfigClient{
                configs: map[string]any{
                        versionConfigKey:   
createExploreServerVersionConfig(testVersion112),
                        serverConfigKey112: 
createMcpServerConfig(testMcpServerID, testVersion112, testServiceName),
@@ -366,15 +374,24 @@ func TestNacosRegistryClient_ListenToMcpServer(t 
*testing.T) {
 
        // Set up listener for configuration changes
        var newConfig *McpServerConfig
+       var configMutex sync.Mutex
        err = client.ListenToMcpServer(testMcpServerID, func(info 
*McpServerConfig) {
+               configMutex.Lock()
                newConfig = info
+               configMutex.Unlock()
        })
        if err != nil {
                t.Fatalf("Failed to start listening to MCP server: %v", err)
        }
 
        // Wait for initial configuration to be loaded
-       for i := 0; i < testRetryMaxAttempts && newConfig == nil; i++ {
+       for i := 0; i < testRetryMaxAttempts; i++ {
+               configMutex.Lock()
+               cfg := newConfig
+               configMutex.Unlock()
+               if cfg != nil {
+                       break
+               }
                time.Sleep(testRetryInterval)
        }
 
@@ -384,19 +401,26 @@ func TestNacosRegistryClient_ListenToMcpServer(t 
*testing.T) {
        // Replace nacos template with processed version
        expectedToolsConfig = strings.ReplaceAll(expectedToolsConfig, 
fmt.Sprintf("${nacos.%s/%s}", testConfigKey, testConfigKey), 
fmt.Sprintf(".config.credentials.%s", testCredentialKey))
 
+       configMutex.Lock()
        assert.Equal(t, expectedServerConfig, newConfig.ServerSpecConfig)
        assert.Equal(t, expectedToolsConfig, newConfig.ToolsSpecConfig)
        assert.Equal(t, 1, len(newConfig.Credentials))
        assert.Equal(t, map[string]any{"key": testSecretKey}, 
newConfig.Credentials[testCredentialKey])
+       configMutex.Unlock()
 
        // Test case 1: Change tool nacos template reference
+       configClient.mu.RLock()
        listener := configClient.configListenerMap[toolsConfigKey112][0]
+       configClient.mu.RUnlock()
        updatedToolsConfig := createMcpToolsConfig(fmt.Sprintf("%s/%s", 
testConfigKey1, testConfigKey1))
        listener(testNamespace, "mcp-tools", toolsConfigKey112, 
updatedToolsConfig)
 
        // Wait for tools update to propagate
        for i := 0; i < testRetryMaxAttempts; i++ {
-               if newConfig != nil && 
strings.Contains(newConfig.ToolsSpecConfig, testCredentialKey1) {
+               configMutex.Lock()
+               cfg := newConfig
+               configMutex.Unlock()
+               if cfg != nil && strings.Contains(cfg.ToolsSpecConfig, 
testCredentialKey1) {
                        break
                }
                time.Sleep(testRetryInterval)
@@ -404,30 +428,42 @@ func TestNacosRegistryClient_ListenToMcpServer(t 
*testing.T) {
 
        // Verify updated tools configuration
        expectedUpdatedToolsConfig := strings.ReplaceAll(updatedToolsConfig, 
fmt.Sprintf("${nacos.%s/%s}", testConfigKey1, testConfigKey1), 
fmt.Sprintf(".config.credentials.%s", testCredentialKey1))
+       configMutex.Lock()
        assert.Equal(t, expectedUpdatedToolsConfig, newConfig.ToolsSpecConfig)
        assert.Equal(t, 1, len(newConfig.Credentials))
        assert.Equal(t, map[string]any{"key": testSecretKey1}, 
newConfig.Credentials[testCredentialKey1])
+       configMutex.Unlock()
 
        // Test case 2: Change backend service name
+       configClient.mu.RLock()
        serviceListener := configClient.configListenerMap[serverConfigKey112][0]
+       configClient.mu.RUnlock()
        updatedServerConfig := createMcpServerConfig(testMcpServerID, 
testVersion112, testServiceNameNew)
        serviceListener(testNamespace, "mcp-server", serverConfigKey112, 
updatedServerConfig)
 
        for i := 0; i < testRetryMaxAttempts; i++ {
-               if newConfig != nil && 
strings.Contains(newConfig.ServerSpecConfig, testServiceNameNew) {
+               configMutex.Lock()
+               cfg := newConfig
+               configMutex.Unlock()
+               if cfg != nil && strings.Contains(cfg.ServerSpecConfig, 
testServiceNameNew) {
                        break
                }
                time.Sleep(testRetryInterval)
        }
 
        // Test case 3: Publish new version of MCP server
+       configClient.mu.RLock()
        versionListener := configClient.configListenerMap[versionConfigKey][0]
+       configClient.mu.RUnlock()
        updatedVersionConfig := createExploreServerVersionConfig(testVersion113)
        versionListener(testNamespace, testGroupNameMcpVersions, 
versionConfigKey, updatedVersionConfig)
 
        // Wait for version update to trigger server config change
        for i := 0; i < testRetryMaxAttempts; i++ {
-               if newConfig != nil && 
strings.Contains(newConfig.ServerSpecConfig, fmt.Sprintf("\"version\":\"%s\"", 
testVersion113)) {
+               configMutex.Lock()
+               cfg := newConfig
+               configMutex.Unlock()
+               if cfg != nil && strings.Contains(cfg.ServerSpecConfig, 
fmt.Sprintf("\"version\":\"%s\"", testVersion113)) {
                        break
                }
                time.Sleep(testRetryInterval)
@@ -435,7 +471,10 @@ func TestNacosRegistryClient_ListenToMcpServer(t 
*testing.T) {
 
        // Wait for tools config to update to new version reference
        for i := 0; i < testRetryMaxAttempts; i++ {
-               if newConfig != nil && 
strings.Contains(newConfig.ToolsSpecConfig, testCredentialKey3) {
+               configMutex.Lock()
+               cfg := newConfig
+               configMutex.Unlock()
+               if cfg != nil && strings.Contains(cfg.ToolsSpecConfig, 
testCredentialKey3) {
                        break
                }
                time.Sleep(testRetryInterval)
@@ -446,8 +485,10 @@ func TestNacosRegistryClient_ListenToMcpServer(t 
*testing.T) {
        expectedFinalToolsConfig := createMcpToolsConfig(fmt.Sprintf("%s/%s", 
testConfigKey3, testConfigKey3))
        expectedFinalToolsConfig = strings.ReplaceAll(expectedFinalToolsConfig, 
fmt.Sprintf("${nacos.%s/%s}", testConfigKey3, testConfigKey3), 
fmt.Sprintf(".config.credentials.%s", testCredentialKey3))
 
+       configMutex.Lock()
        assert.Equal(t, expectedFinalServerConfig, newConfig.ServerSpecConfig)
        assert.Equal(t, expectedFinalToolsConfig, newConfig.ToolsSpecConfig)
        assert.Equal(t, 1, len(newConfig.Credentials))
        assert.Equal(t, map[string]any{"key": testSecretKey3}, 
newConfig.Credentials[testCredentialKey3])
+       configMutex.Unlock()
 }
diff --git a/pkg/filter/mcp/mcpserver/transport/session_manager.go 
b/pkg/filter/mcp/mcpserver/transport/session_manager.go
index 1eee222e..ddc8eda4 100644
--- a/pkg/filter/mcp/mcpserver/transport/session_manager.go
+++ b/pkg/filter/mcp/mcpserver/transport/session_manager.go
@@ -42,6 +42,7 @@ type MCPSession struct {
        LastActivity time.Time
        PipeWriter   *io.PipeWriter // Pipe writer for sending SSE messages
        Done         chan struct{}
+       mu           sync.RWMutex // Protects LastActivity field
 }
 
 // SessionManager manages MCP sessions for SSE connections
@@ -70,7 +71,10 @@ func (sm *SessionManager) EnsureSession(sessionIDHeader 
string) (*MCPSession, bo
        // Try to get existing session
        if sessionIDHeader != "" {
                if session, exists := sm.sessions[sessionIDHeader]; exists {
+                       // Update LastActivity using session's own mutex
+                       session.mu.Lock()
                        session.LastActivity = time.Now()
+                       session.mu.Unlock()
                        return session, false // existing session
                }
        }
@@ -92,11 +96,13 @@ func (sm *SessionManager) EnsureSession(sessionIDHeader 
string) (*MCPSession, bo
 // Session retrieves a session by ID
 func (sm *SessionManager) Session(sessionID string) (*MCPSession, bool) {
        sm.mu.RLock()
-       defer sm.mu.RUnlock()
-
        session, exists := sm.sessions[sessionID]
+       sm.mu.RUnlock()
+
        if exists {
+               session.mu.Lock()
                session.LastActivity = time.Now()
+               session.mu.Unlock()
        }
        return session, exists
 }
@@ -168,7 +174,11 @@ func (sm *SessionManager) cleanupExpiredSessions() {
        var toRemove []string
 
        for sessionID, session := range sm.sessions {
-               if now.Sub(session.LastActivity) > SessionTimeout {
+               session.mu.RLock()
+               lastActivity := session.LastActivity
+               session.mu.RUnlock()
+
+               if now.Sub(lastActivity) > SessionTimeout {
                        toRemove = append(toRemove, sessionID)
                }
        }
diff --git a/pkg/filter/mcp/mcpserver/transport/session_manager_test.go 
b/pkg/filter/mcp/mcpserver/transport/session_manager_test.go
index 93acc5c4..4b888d66 100644
--- a/pkg/filter/mcp/mcpserver/transport/session_manager_test.go
+++ b/pkg/filter/mcp/mcpserver/transport/session_manager_test.go
@@ -79,7 +79,15 @@ func TestEnsureSession_ReuseExisting(t *testing.T) {
        }
 
        // Verify last activity was updated
-       if session2.LastActivity.Before(session1.LastActivity) {
+       session1.mu.RLock()
+       lastActivity1 := session1.LastActivity
+       session1.mu.RUnlock()
+
+       session2.mu.RLock()
+       lastActivity2 := session2.LastActivity
+       session2.mu.RUnlock()
+
+       if lastActivity2.Before(lastActivity1) {
                t.Error("LastActivity should be updated")
        }
 }
@@ -152,9 +160,9 @@ func TestSessionCleanup(t *testing.T) {
        sessionID := session.ID
 
        // Manually set old LastActivity to simulate timeout
-       sm.mu.Lock()
+       session.mu.Lock()
        session.LastActivity = time.Now().Add(-SessionTimeout - 1*time.Minute)
-       sm.mu.Unlock()
+       session.mu.Unlock()
 
        // Trigger cleanup
        sm.cleanupExpiredSessions()
diff --git a/pkg/filter/mcp/mcpserver/transport/sse_handler.go 
b/pkg/filter/mcp/mcpserver/transport/sse_handler.go
index 166a9f95..70a3329d 100644
--- a/pkg/filter/mcp/mcpserver/transport/sse_handler.go
+++ b/pkg/filter/mcp/mcpserver/transport/sse_handler.go
@@ -60,7 +60,11 @@ func (h *SSEHandler) SendSSEMessage(session *MCPSession, 
message any) error {
                return fmt.Errorf("failed to write to SSE pipe: %w", err)
        }
 
+       // Update LastActivity using session's mutex
+       session.mu.Lock()
        session.LastActivity = time.Now()
+       session.mu.Unlock()
+
        logger.Debugf("[dubbo-go-pixiu] mcp server sent SSE message to session: 
%s", session.ID)
        return nil
 }

Reply via email to