This is an automated email from the ASF dual-hosted git repository.

wuxinfan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git


The following commit(s) were added to refs/heads/develop by this push:
     new d3da9280 feat(MCP): enhance streamable http capability (#769)
d3da9280 is described below

commit d3da92806f7c6efe05d64452d53334df9b537da5
Author: Zerui Yang <[email protected]>
AuthorDate: Thu Oct 30 15:02:29 2025 +0800

    feat(MCP): enhance streamable http capability (#769)
    
    * feat(mcp): implement Nacos client and MCP server configuration management
    
    * feat(nacos): add MCP controller and provider registration for Nacos 
integration
    
    * feat(mcp): enhance Nacos integration with improved tool registry and 
configuration synchronization
    
    * fix: rollback
    
    * fix: update dependencies for Nacos integration and improve compatibility
    
    * chore: add Apache License header to multiple files
    
    * fix: update dependencies for improved compatibility and performance
    
    * fix: refactor URL parsing to use constants and add test
    
    * fix: implement thread-safe singleton initialization for ToolRegistry and 
DynamicConsumer
    
    * fix: update Credentials type to use 'any' for improved flexibility
    
    * fix: add unit tests for dynamic consumer and tool registry with 
concurrency support
    
    * fix: update resetNacosTemplateConfigs to clarify its role in resetting 
credential configurations
    
    * fix: update types to use 'any' for improved flexibility in Nacos 
configuration
    
    * fix: enhance DynamicConsumer with debounce functionality and fingerprint 
calculation
    
    * fix: update GetDebounceInfo to use 'any' for improved type flexibility
    
    * fix: reorganize imports in dynamic.go for better structure
    
    * fix: refactor URL parsing logic and enhance error handling in adapter and 
registry components
    
    * test: add unit tests for URL parsing and path extraction functions
    
    * fix: standardize root path return value in ExtractPathFromURL and 
ReplaceGoTemplateArgsInPath functions
    
    * fix: update test case to use example.com for host:port format
    
    * fix: update MCP controller and dynamic consumer to handle server ID in 
configuration updates
    
    * fix: update MCP controller and dynamic consumer to handle server ID in 
configuration updates
    
    * fix: update dependencies in go.mod and go.sum files
    
    * fix: add validation for Nacos addresses and improve logging in MCP adapter
    
    * fix: update go.mod and go.sum to include new Alibaba Cloud SDK 
dependencies
    
    * feat: add documentation to implement streamable HTTP enhancements
    
    * feat: implement content negotiation and session management for MCP SSE 
support
    
    * fix: refactor MCP server components for improved session management and 
SSE handling
    
    * fix: simplify content negotiation for SSE responses in MCP server
    
    * fix: remove doc
    
    * refactor: organize imports and ensure newline at end of files
    
    * fix: update dependencies in go.mod and go.sum
    
    * fix: remove unused global variables and singleton functions from 
dynamic.go
    
    * Revert "fix: correct Pull bot user identification and enhance PR body 
formatting"
    
    This reverts commit 40646cbf
    
    * fix: correct Pull bot user identification in sync-to-upstream.yml
    
    * fix: update NewDynamicConsumer to include SSEHandler and refactor SSE 
message formatting
    
    * fix: reorganize import statements in dynamic.go, dynamic_test.go, and 
globals.go
    
    * fix: replace strings.Replace with strings.ReplaceAll for improved 
readability in client_test.go
    
    * fix: reorganize import statements in client_test.go for improved clarity
    
    * rollback: back sync file
---
 .github/workflows/sync-to-upstream.yml             |  41 +-
 go.mod                                             |  26 +-
 go.sum                                             |  56 +--
 .../mcpserver/registry/nacos/client_test.go        | 117 ++----
 pkg/adapter/mcpserver/registrycenter.go            |   2 +-
 pkg/common/constant/http.go                        |  18 +
 pkg/filter/mcp/mcpserver/context.go                | 106 ++++++
 pkg/filter/mcp/mcpserver/dynamic.go                | 107 ++++--
 .../mcp/mcpserver/dynamic_notification_test.go     | 181 +++++++++
 pkg/filter/mcp/mcpserver/dynamic_test.go           |  65 ++--
 pkg/filter/mcp/mcpserver/filter.go                 | 424 ++++++++++++++++++---
 pkg/filter/mcp/mcpserver/filter_sse_test.go        | 382 +++++++++++++++++++
 pkg/filter/mcp/mcpserver/globals.go                |  85 +++++
 pkg/filter/mcp/mcpserver/handlers.go               | 134 +++++--
 pkg/filter/mcp/mcpserver/registry.go               |   7 -
 pkg/filter/mcp/mcpserver/response.go               |  27 ++
 .../mcp/mcpserver/transport/content_negotiator.go  | 146 +++++++
 .../mcpserver/transport/content_negotiator_test.go | 247 ++++++++++++
 .../mcp/mcpserver/transport/session_manager.go     | 206 ++++++++++
 .../mcpserver/transport/session_manager_test.go    | 272 +++++++++++++
 pkg/filter/mcp/mcpserver/transport/sse_handler.go  |  89 +++++
 .../mcp/mcpserver/transport/sse_handler_test.go    | 226 +++++++++++
 22 files changed, 2663 insertions(+), 301 deletions(-)

diff --git a/.github/workflows/sync-to-upstream.yml 
b/.github/workflows/sync-to-upstream.yml
index 3b1fe8d1..aaad4276 100644
--- a/.github/workflows/sync-to-upstream.yml
+++ b/.github/workflows/sync-to-upstream.yml
@@ -21,11 +21,11 @@ name: Sync to Upstream
 
 # Automatically creates a PR to upstream when a PR is merged to fork.
 # Only executes in fork repositories (checked by github.repository).
-# 
+#
 # Prerequisites:
 # - UPSTREAM_GITHUB_TOKEN secret with 'public_repo' permission
 # - Bot account with Write access to fork repository
-# 
+#
 # Configuration: Modify the env section below for your project
 
 # ============================================================================
@@ -35,14 +35,14 @@ env:
   # Upstream repository
   UPSTREAM_ORG: apache
   UPSTREAM_REPO: dubbo-go-pixiu
-  
+
   # Fork repository
   FORK_ORG: dubbo-go-pixiu
   FORK_REPO: dubbo-go-pixiu
-  
+
   # Branch name
   BASE_BRANCH: develop
-  
+
   # Git bot info
   BOT_NAME: "Pixiu Bot"
   BOT_EMAIL: "[email protected]"
@@ -66,10 +66,10 @@ jobs:
   sync-to-upstream:
     name: Sync to Upstream Repository
     runs-on: ubuntu-latest
-    
+
     # Only run when PR is merged; repository check happens inside steps where 
env is available
     if: github.event.pull_request.merged == true
-    
+
     steps:
       - name: Check repository and target branch
         id: check_branch
@@ -101,21 +101,21 @@ jobs:
           fetch-depth: 0
           ref: ${{ env.BASE_BRANCH }}
           token: ${{ secrets.UPSTREAM_GITHUB_TOKEN }}
-      
+
       # Step 2: Configure Git user
       - name: Configure Git user
         if: steps.check_branch.outputs.skip != 'true'
         run: |
           git config user.name "${{ env.BOT_NAME }}"
           git config user.email "${{ env.BOT_EMAIL }}"
-      
+
       # Step 3: Add upstream remote and fetch
       - name: Add upstream remote
         if: steps.check_branch.outputs.skip != 'true'
         run: |
           git remote add upstream https://github.com/${{ env.UPSTREAM_ORG 
}}/${{ env.UPSTREAM_REPO }}.git
           git fetch upstream ${{ env.BASE_BRANCH }}
-      
+
       # Step 4: Create sync branch with timestamp
       - name: Create sync branch
         if: steps.check_branch.outputs.skip != 'true'
@@ -125,20 +125,20 @@ jobs:
           echo "SYNC_BRANCH=${SYNC_BRANCH}" >> $GITHUB_ENV
           git checkout -b ${SYNC_BRANCH}
           echo "branch=${SYNC_BRANCH}" >> $GITHUB_OUTPUT
-      
+
       # Step 5: Rebase onto upstream base branch
       - name: Rebase onto upstream
         if: steps.check_branch.outputs.skip != 'true'
         id: rebase
         run: |
           git rebase upstream/${{ env.BASE_BRANCH }}
-      
+
       # Step 6: Push sync branch to origin
       - name: Push sync branch
         if: steps.check_branch.outputs.skip != 'true'
         run: |
           git push origin ${SYNC_BRANCH} --force-with-lease
-      
+
       # Step 7: Generate PR body with attribution
       - name: Generate PR description
         if: steps.check_branch.outputs.skip != 'true'
@@ -176,7 +176,7 @@ jobs:
             echo ""
             echo "cc @${ORIGINAL_AUTHOR}"
           } > pr_body.md
-      
+
       - name: Create PR to ${{ env.UPSTREAM_ORG }}/${{ env.UPSTREAM_REPO }}
         if: steps.check_branch.outputs.skip != 'true'
         id: create_pr
@@ -193,7 +193,7 @@ jobs:
           
           echo "pr_url=${PR_URL}" >> $GITHUB_OUTPUT
           echo "✅ Successfully created PR: ${PR_URL}"
-      
+
       - name: Notify original PR
         if: success() && steps.check_branch.outputs.skip != 'true'
         env:
@@ -214,7 +214,7 @@ jobs:
             echo "⚠️  Comment failed but sync succeeded: $UPSTREAM_PR_URL"
             exit 0
           }
-      
+
       - name: Handle rebase failure
         if: failure() && steps.rebase.outcome == 'failure'
         env:
@@ -228,13 +228,13 @@ jobs:
             --repo ${{ env.FORK_ORG }}/${{ env.FORK_REPO }} \
             --title "⚠️ Failed to auto-sync PR #${PR_NUMBER} to upstream" \
             --body "## Sync Failure Report
-            
+          
             **Original PR**: #${PR_NUMBER}
             **Author**: @${PR_AUTHOR}
             **Error**: Rebase conflicts detected
-            
+          
             ### Manual Resolution Required
-            
+          
             \`\`\`bash
             git checkout ${{ env.BASE_BRANCH }}
             git checkout -b manual-sync-${PR_NUMBER}
@@ -245,9 +245,8 @@ jobs:
             git push origin manual-sync-${PR_NUMBER}
             # Create PR to ${{ env.UPSTREAM_ORG }}/${{ env.UPSTREAM_REPO }}
             \`\`\`
-            
+          
             cc @${PR_AUTHOR}" \
             --label "sync-failure,needs-attention"
           
           echo "❌ Rebase failed. Issue created for manual resolution."
-
diff --git a/go.mod b/go.mod
index 0c1803a7..9304f06e 100644
--- a/go.mod
+++ b/go.mod
@@ -30,8 +30,8 @@ require (
        github.com/golang/protobuf v1.5.4
        github.com/jhump/protoreflect v1.17.0
        github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
-       github.com/lestrrat-go/httprc/v3 v3.0.0-beta1
-       github.com/lestrrat-go/jwx/v3 v3.0.0
+       github.com/lestrrat-go/httprc/v3 v3.0.1
+       github.com/lestrrat-go/jwx/v3 v3.0.12
        github.com/mark3labs/mcp-go v0.32.0
        github.com/mitchellh/mapstructure v1.5.0
        github.com/nacos-group/nacos-sdk-go v1.1.3
@@ -44,7 +44,7 @@ require (
        github.com/spf13/cast v1.7.1
        github.com/spf13/cobra v1.5.0
        github.com/spf13/viper v1.8.1
-       github.com/stretchr/testify v1.10.0
+       github.com/stretchr/testify v1.11.1
        github.com/swaggo/files v1.0.1
        github.com/swaggo/gin-swagger v1.6.0
        github.com/swaggo/swag v1.8.12
@@ -60,8 +60,8 @@ require (
        go.opentelemetry.io/otel/sdk/metric v0.32.1
        go.opentelemetry.io/otel/trace v1.10.0
        go.uber.org/zap v1.21.0
-       golang.org/x/crypto v0.41.0
-       golang.org/x/net v0.43.0
+       golang.org/x/crypto v0.43.0
+       golang.org/x/net v0.45.0
        google.golang.org/grpc v1.66.2
        google.golang.org/protobuf v1.36.6
        gopkg.in/yaml.v3 v3.0.1
@@ -178,9 +178,12 @@ require (
        github.com/klauspost/cpuid/v2 v2.2.7 // indirect
        github.com/knadh/koanf v1.5.0 // indirect
        github.com/leodido/go-urn v1.4.0 // indirect
-       github.com/lestrrat-go/blackmagic v1.0.2 // indirect
+       github.com/lestrrat-go/blackmagic v1.0.4 // indirect
+       github.com/lestrrat-go/dsig v1.0.0 // indirect
+       github.com/lestrrat-go/dsig-secp256k1 v1.0.0 // indirect
        github.com/lestrrat-go/httpcc v1.0.1 // indirect
        github.com/lestrrat-go/option v1.0.1 // indirect
+       github.com/lestrrat-go/option/v2 v2.0.0 // indirect
        github.com/lestrrat-go/strftime v1.1.1 // indirect
        github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // 
indirect
        github.com/magiconair/properties v1.8.5 // indirect
@@ -211,7 +214,7 @@ require (
        github.com/prometheus/procfs v0.16.1 // indirect
        github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // 
indirect
        github.com/robfig/cron/v3 v3.0.1 // indirect
-       github.com/segmentio/asm v1.2.0 // indirect
+       github.com/segmentio/asm v1.2.1 // indirect
        github.com/shirou/gopsutil/v3 v3.22.2 // indirect
        github.com/sirupsen/logrus v1.9.0 // indirect
        github.com/smartystreets/assertions v1.2.0 // indirect
@@ -228,6 +231,7 @@ require (
        github.com/uber/jaeger-client-go v2.29.1+incompatible // indirect
        github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
        github.com/ugorji/go/codec v1.2.12 // indirect
+       github.com/valyala/fastjson v1.6.4 // indirect
        github.com/wasmerio/wasmer-go v1.0.3 // indirect
        github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // 
indirect
        github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 
// indirect
@@ -244,11 +248,11 @@ require (
        go.uber.org/multierr v1.8.0 // indirect
        golang.org/x/arch v0.20.0 // indirect
        golang.org/x/oauth2 v0.30.0 // indirect
-       golang.org/x/sync v0.16.0 // indirect
-       golang.org/x/sys v0.35.0 // indirect
-       golang.org/x/text v0.28.0 // indirect
+       golang.org/x/sync v0.17.0 // indirect
+       golang.org/x/sys v0.37.0 // indirect
+       golang.org/x/text v0.30.0 // indirect
        golang.org/x/time v0.12.0 // indirect
-       golang.org/x/tools v0.36.0 // indirect
+       golang.org/x/tools v0.37.0 // indirect
        google.golang.org/genproto/googleapis/api 
v0.0.0-20240604185151-ef581f913117 // indirect
        google.golang.org/genproto/googleapis/rpc 
v0.0.0-20240604185151-ef581f913117 // indirect
        gopkg.in/ini.v1 v1.67.0 // indirect
diff --git a/go.sum b/go.sum
index 57818b99..8da9ce63 100644
--- a/go.sum
+++ b/go.sum
@@ -1129,20 +1129,26 @@ github.com/kylelemons/godebug v1.1.0/go.mod 
h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+
 github.com/leodido/go-urn v1.2.2/go.mod 
h1:kUaIbLZWttglzwNuG0pgsh5vuV6u2YcGBYz1hIPjtOQ=
 github.com/leodido/go-urn v1.4.0 
h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
 github.com/leodido/go-urn v1.4.0/go.mod 
h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
-github.com/lestrrat-go/blackmagic v1.0.2 
h1:Cg2gVSc9h7sz9NOByczrbUvLopQmXrfFx//N+AkAr5k=
-github.com/lestrrat-go/blackmagic v1.0.2/go.mod 
h1:UrEqBzIR2U6CnzVyUtfM6oZNMt/7O7Vohk2J0OGSAtU=
+github.com/lestrrat-go/blackmagic v1.0.4 
h1:IwQibdnf8l2KoO+qC3uT4OaTWsW7tuRQXy9TRN9QanA=
+github.com/lestrrat-go/blackmagic v1.0.4/go.mod 
h1:6AWFyKNNj0zEXQYfTMPfZrAXUWUfTIZ5ECEUEJaijtw=
+github.com/lestrrat-go/dsig v1.0.0 
h1:OE09s2r9Z81kxzJYRn07TFM9XA4akrUdoMwr0L8xj38=
+github.com/lestrrat-go/dsig v1.0.0/go.mod 
h1:dEgoOYYEJvW6XGbLasr8TFcAxoWrKlbQvmJgCR0qkDo=
+github.com/lestrrat-go/dsig-secp256k1 v1.0.0 
h1:JpDe4Aybfl0soBvoVwjqDbp+9S1Y2OM7gcrVVMFPOzY=
+github.com/lestrrat-go/dsig-secp256k1 v1.0.0/go.mod 
h1:CxUgAhssb8FToqbL8NjSPoGQlnO4w3LG1P0qPWQm/NU=
 github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc 
h1:RKf14vYWi2ttpEmkA4aQ3j4u9dStX2t4M8UM6qqNsG8=
 github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc/go.mod 
h1:kopuH9ugFRkIXf3YoqHKyrJ9YfUFsckUU9S7B+XP+is=
 github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible 
h1:Y6sqxHMyB1D2YSzWkLibYKgg+SwmyFU9dF2hn6MdTj4=
 github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible/go.mod 
h1:ZQnN8lSECaebrkQytbHj4xNgtg8CR7RYXnPok8e0EHA=
 github.com/lestrrat-go/httpcc v1.0.1 
h1:ydWCStUeJLkpYyjLDHihupbn2tYmZ7m22BGkcvZZrIE=
 github.com/lestrrat-go/httpcc v1.0.1/go.mod 
h1:qiltp3Mt56+55GPVCbTdM9MlqhvzyuL6W/NMDA8vA5E=
-github.com/lestrrat-go/httprc/v3 v3.0.0-beta1 
h1:pzDjP9dSONCFQC/AE3mWUnHILGiYPiMKzQIS+weKJXA=
-github.com/lestrrat-go/httprc/v3 v3.0.0-beta1/go.mod 
h1:wdsgouffPvWPEYh8t7PRH/PidR5sfVqt0na4Nhj60Ms=
-github.com/lestrrat-go/jwx/v3 v3.0.0 
h1:IRnFNdZx5dJHjTpPVkYqP6TRahJI2Z9v43UwEDJcj6U=
-github.com/lestrrat-go/jwx/v3 v3.0.0/go.mod 
h1:ak32WoNtHE0aLowVWBcCvXngcAnW4tuC0YhFwOr/kwc=
+github.com/lestrrat-go/httprc/v3 v3.0.1 
h1:3n7Es68YYGZb2Jf+k//llA4FTZMl3yCwIjFIk4ubevI=
+github.com/lestrrat-go/httprc/v3 v3.0.1/go.mod 
h1:2uAvmbXE4Xq8kAUjVrZOq1tZVYYYs5iP62Cmtru00xk=
+github.com/lestrrat-go/jwx/v3 v3.0.12 
h1:p25r68Y4KrbBdYjIsQweYxq794CtGCzcrc5dGzJIRjg=
+github.com/lestrrat-go/jwx/v3 v3.0.12/go.mod 
h1:HiUSaNmMLXgZ08OmGBaPVvoZQgJVOQphSrGr5zMamS8=
 github.com/lestrrat-go/option v1.0.1 
h1:oAzP2fvZGQKWkvHa1/SAcFolBEca1oN+mQ7eooNBEYU=
 github.com/lestrrat-go/option v1.0.1/go.mod 
h1:5ZHFbivi4xwXxhxY9XHDe2FHo6/Z7WWmtT7T5nBBp3I=
+github.com/lestrrat-go/option/v2 v2.0.0 
h1:XxrcaJESE1fokHy3FpaQ/cXW8ZsIdWcdFzzLOcID3Ss=
+github.com/lestrrat-go/option/v2 v2.0.0/go.mod 
h1:oSySsmzMoR0iRzCDCaUfsCzxQHUEuhOViQObyy7S6Vg=
 github.com/lestrrat-go/strftime v1.1.1 
h1:zgf8QCsgj27GlKBy3SU9/8MMgegZ8UCzlCyHYrUF0QU=
 github.com/lestrrat-go/strftime v1.1.1/go.mod 
h1:YDrzHJAODYQ+xxvrn5SG01uFIQAeDTzpxNVppCz7Nmw=
 github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570/go.mod 
h1:BLt8L9ld7wVsvEWQbuLrUZnCMnUmLZ+CGDzKtclrTlE=
@@ -1385,8 +1391,8 @@ github.com/ryanuber/columnize v2.1.0+incompatible/go.mod 
h1:sm1tb6uqfes/u+d4ooFo
 github.com/ryanuber/go-glob v1.0.0/go.mod 
h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc=
 github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod 
h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
 github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod 
h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
-github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
-github.com/segmentio/asm v1.2.0/go.mod 
h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
+github.com/segmentio/asm v1.2.1 h1:DTNbBqs57ioxAD4PrArqftgypG4/qNpXoJx8TVXxPR0=
+github.com/segmentio/asm v1.2.1/go.mod 
h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
 github.com/shirou/gopsutil v3.20.11+incompatible/go.mod 
h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
 github.com/shirou/gopsutil/v3 v3.21.6/go.mod 
h1:JfVbDpIBLVzT8oKbvMg9P3wEIMDDpVn+LwHTKj0ST88=
 github.com/shirou/gopsutil/v3 v3.22.2 
h1:wCrArWFkHYIdDxx/FSfF5RB4dpJYW6t7rcp3+zL8uks=
@@ -1461,8 +1467,8 @@ github.com/stretchr/testify v1.8.2/go.mod 
h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
 github.com/stretchr/testify v1.8.3/go.mod 
h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
 github.com/stretchr/testify v1.8.4/go.mod 
h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
 github.com/stretchr/testify v1.9.0/go.mod 
h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
-github.com/stretchr/testify v1.10.0 
h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
-github.com/stretchr/testify v1.10.0/go.mod 
h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+github.com/stretchr/testify v1.11.1 
h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
+github.com/stretchr/testify v1.11.1/go.mod 
h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
 github.com/subosito/gotenv v1.2.0 
h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
 github.com/subosito/gotenv v1.2.0/go.mod 
h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
 github.com/swaggo/files v1.0.1 h1:J1bVJ4XHZNq0I46UU90611i9/YzdrF7x92oX1ig5IdE=
@@ -1505,6 +1511,8 @@ github.com/ugorji/go/codec v1.2.12/go.mod 
h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZ
 github.com/urfave/cli v1.20.0/go.mod 
h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
 github.com/urfave/cli v1.22.1/go.mod 
h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
 github.com/urfave/cli/v2 v2.3.0/go.mod 
h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI=
+github.com/valyala/fastjson v1.6.4 
h1:uAUNq9Z6ymTgGhcm0UynUAB6tlbakBrz6CQFax3BXVQ=
+github.com/valyala/fastjson v1.6.4/go.mod 
h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY=
 github.com/wasmerio/wasmer-go v1.0.3 
h1:9pWIlIqUKxALvFlWK8+Zy90qyqxd+8wlyVG91txh1TU=
 github.com/wasmerio/wasmer-go v1.0.3/go.mod 
h1:0gzVdSfg6pysA6QVp6iVRPTagC6Wq9pOE8J86WKb2Fk=
 github.com/xdg-go/pbkdf2 v1.0.0/go.mod 
h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
@@ -1657,8 +1665,8 @@ golang.org/x/crypto v0.14.0/go.mod 
h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf
 golang.org/x/crypto v0.18.0/go.mod 
h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
 golang.org/x/crypto v0.19.0/go.mod 
h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
 golang.org/x/crypto v0.21.0/go.mod 
h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
-golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4=
-golang.org/x/crypto v0.41.0/go.mod 
h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc=
+golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04=
+golang.org/x/crypto v0.43.0/go.mod 
h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0=
 golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod 
h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
 golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod 
h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
 golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod 
h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@@ -1704,8 +1712,8 @@ golang.org/x/mod 
v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91
 golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
 golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
 golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
-golang.org/x/mod v0.27.0 h1:kb+q2PyFnEADO2IEF935ehFUXlWiNjJWtRNgBLSfbxQ=
-golang.org/x/mod v0.27.0/go.mod h1:rWI627Fq0DEoudcK+MBkNkCe0EetEaDSwJJkCcjpazc=
+golang.org/x/mod v0.28.0 h1:gQBtGhjxykdjY9YhZpSlZIsbnaE2+PgjfLWUQTnoZ1U=
+golang.org/x/mod v0.28.0/go.mod h1:yfB/L0NOf/kmEbXjzCPOx1iK1fRutOydrCMsqRhEBxI=
 golang.org/x/net v0.0.0-20180530234432-1e491301e022/go.mod 
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod 
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod 
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -1788,8 +1796,8 @@ golang.org/x/net v0.17.0/go.mod 
h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
 golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
 golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
 golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
-golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE=
-golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg=
+golang.org/x/net v0.45.0 h1:RLBg5JKixCy82FtLJpeNlVM0nrSqpCRYzVU1n8kj0tM=
+golang.org/x/net v0.45.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod 
h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod 
h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod 
h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -1834,8 +1842,8 @@ golang.org/x/sync 
v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJ
 golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw=
-golang.org/x/sync v0.16.0/go.mod 
h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
+golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
+golang.org/x/sync v0.17.0/go.mod 
h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
 golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -1956,8 +1964,8 @@ golang.org/x/sys v0.13.0/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
 golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
 golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
-golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
-golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
+golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
+golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod 
h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
 golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod 
h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
 golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc=
@@ -1989,8 +1997,8 @@ golang.org/x/text v0.9.0/go.mod 
h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
 golang.org/x/text v0.10.0/go.mod 
h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
 golang.org/x/text v0.13.0/go.mod 
h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
 golang.org/x/text v0.14.0/go.mod 
h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
-golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng=
-golang.org/x/text v0.28.0/go.mod 
h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU=
+golang.org/x/text v0.30.0 h1:yznKA/E9zq54KzlzBEAWn1NXSQ8DIp/NYMy88xJjl4k=
+golang.org/x/text v0.30.0/go.mod 
h1:yDdHFIX9t+tORqspjENWgzaCVXgk0yYnYuSZ8UzzBVM=
 golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod 
h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod 
h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod 
h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@@ -2074,8 +2082,8 @@ golang.org/x/tools v0.1.12/go.mod 
h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc
 golang.org/x/tools v0.3.0/go.mod 
h1:/rWhSS2+zyEVwoJf8YAX6L2f0ntZ7Kn/mGgAWcipA5k=
 golang.org/x/tools v0.6.0/go.mod 
h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
 golang.org/x/tools v0.7.0/go.mod 
h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s=
-golang.org/x/tools v0.36.0 h1:kWS0uv/zsvHEle1LbV5LE8QujrxB3wfQyxHfhOk0Qkg=
-golang.org/x/tools v0.36.0/go.mod 
h1:WBDiHKJK8YgLHlcQPYQzNCkUxUypCaa5ZegCVutKm+s=
+golang.org/x/tools v0.37.0 h1:DVSRzp7FwePZW356yEAChSdNcQo6Nsp+fex1SUW09lE=
+golang.org/x/tools v0.37.0/go.mod 
h1:MBN5QPQtLMHVdvsbtarmTNukZDdgwdwlO5qGacAzF0w=
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
diff --git a/pkg/adapter/mcpserver/registry/nacos/client_test.go 
b/pkg/adapter/mcpserver/registry/nacos/client_test.go
index 1c953fb9..2b89760d 100644
--- a/pkg/adapter/mcpserver/registry/nacos/client_test.go
+++ b/pkg/adapter/mcpserver/registry/nacos/client_test.go
@@ -22,7 +22,6 @@ import (
        "regexp"
        "sort"
        "strings"
-       "sync"
        "testing"
        "time"
 )
@@ -115,14 +114,11 @@ func createBrokenJSON() string {
 }
 
 type MockedNacosConfigClient struct {
-       mu                sync.Mutex
        configs           map[string]any
        configListenerMap map[string][]func(string, string, string, string)
 }
 
-func (m *MockedNacosConfigClient) GetConfig(param vo.ConfigParam) (string, 
error) {
-       m.mu.Lock()
-       defer m.mu.Unlock()
+func (m MockedNacosConfigClient) GetConfig(param vo.ConfigParam) (string, 
error) {
        if result, exist := m.configs[param.DataId+"$$"+param.Group]; exist {
                config, ok := result.(string)
                if ok {
@@ -139,19 +135,17 @@ func (m *MockedNacosConfigClient) GetConfig(param 
vo.ConfigParam) (string, error
        return "", nil
 }
 
-func (m *MockedNacosConfigClient) PublishConfig(param vo.ConfigParam) (bool, 
error) {
+func (m MockedNacosConfigClient) PublishConfig(_ vo.ConfigParam) (bool, error) 
{
        //TODO implement me
        panic("implement me")
 }
 
-func (m *MockedNacosConfigClient) DeleteConfig(param vo.ConfigParam) (bool, 
error) {
+func (m MockedNacosConfigClient) DeleteConfig(_ vo.ConfigParam) (bool, error) {
        //TODO implement me
        panic("implement me")
 }
 
-func (m *MockedNacosConfigClient) ListenConfig(params vo.ConfigParam) (err 
error) {
-       m.mu.Lock()
-       defer m.mu.Unlock()
+func (m MockedNacosConfigClient) ListenConfig(params vo.ConfigParam) (err 
error) {
        if _, ok := m.configListenerMap[params.Group]; !ok {
                m.configListenerMap[params.Group] = []func(string, string, 
string, string){}
        }
@@ -159,25 +153,12 @@ func (m *MockedNacosConfigClient) ListenConfig(params 
vo.ConfigParam) (err error
        return nil
 }
 
-func (m *MockedNacosConfigClient) CancelListenConfig(params vo.ConfigParam) 
(err error) {
-       m.mu.Lock()
-       defer m.mu.Unlock()
+func (m MockedNacosConfigClient) CancelListenConfig(params vo.ConfigParam) 
(err error) {
        delete(m.configListenerMap, params.DataId+"$$"+params.Group)
        return nil
 }
 
-func (m *MockedNacosConfigClient) GetListener(key string, index int) 
func(string, string, string, string) {
-       m.mu.Lock()
-       defer m.mu.Unlock()
-       if listeners, ok := m.configListenerMap[key]; ok && len(listeners) > 
index {
-               return listeners[index]
-       }
-       return nil
-}
-
-func (m *MockedNacosConfigClient) SearchConfig(param vo.SearchConfigParam) 
(*model.ConfigPage, error) {
-       m.mu.Lock()
-       defer m.mu.Unlock()
+func (m MockedNacosConfigClient) SearchConfig(param vo.SearchConfigParam) 
(*model.ConfigPage, error) {
        dataIdRegex := strings.ReplaceAll(param.DataId, "*", ".*")
        groupRegex := strings.ReplaceAll(param.Group, "*", ".*")
        result := []model.ConfigItem{}
@@ -213,7 +194,7 @@ func (m *MockedNacosConfigClient) SearchConfig(param 
vo.SearchConfigParam) (*mod
        }, nil
 }
 
-func (m *MockedNacosConfigClient) CloseClient() {
+func (m MockedNacosConfigClient) CloseClient() {
        //TODO implement me
        panic("implement me")
 }
@@ -222,22 +203,22 @@ type MockedNacosNamingClient struct {
        listenerMap map[string][]func(services []model.Instance, err error)
 }
 
-func (m MockedNacosNamingClient) RegisterInstance(param 
vo.RegisterInstanceParam) (bool, error) {
+func (m MockedNacosNamingClient) RegisterInstance(_ vo.RegisterInstanceParam) 
(bool, error) {
        //TODO implement me
        panic("implement me")
 }
 
-func (m MockedNacosNamingClient) BatchRegisterInstance(param 
vo.BatchRegisterInstanceParam) (bool, error) {
+func (m MockedNacosNamingClient) BatchRegisterInstance(_ 
vo.BatchRegisterInstanceParam) (bool, error) {
        //TODO implement me
        panic("implement me")
 }
 
-func (m MockedNacosNamingClient) DeregisterInstance(param 
vo.DeregisterInstanceParam) (bool, error) {
+func (m MockedNacosNamingClient) DeregisterInstance(_ 
vo.DeregisterInstanceParam) (bool, error) {
        //TODO implement me
        panic("implement me")
 }
 
-func (m MockedNacosNamingClient) UpdateInstance(param vo.UpdateInstanceParam) 
(bool, error) {
+func (m MockedNacosNamingClient) UpdateInstance(_ vo.UpdateInstanceParam) 
(bool, error) {
        //TODO implement me
        panic("implement me")
 }
@@ -255,17 +236,17 @@ func (m MockedNacosNamingClient) GetService(param 
vo.GetServiceParam) (model.Ser
        }, nil
 }
 
-func (m MockedNacosNamingClient) SelectAllInstances(param 
vo.SelectAllInstancesParam) ([]model.Instance, error) {
+func (m MockedNacosNamingClient) SelectAllInstances(_ 
vo.SelectAllInstancesParam) ([]model.Instance, error) {
        //TODO implement me
        panic("implement me")
 }
 
-func (m MockedNacosNamingClient) SelectInstances(param 
vo.SelectInstancesParam) ([]model.Instance, error) {
+func (m MockedNacosNamingClient) SelectInstances(_ vo.SelectInstancesParam) 
([]model.Instance, error) {
        //TODO implement me
        panic("implement me")
 }
 
-func (m MockedNacosNamingClient) SelectOneHealthyInstance(param 
vo.SelectOneHealthInstanceParam) (*model.Instance, error) {
+func (m MockedNacosNamingClient) SelectOneHealthyInstance(_ 
vo.SelectOneHealthInstanceParam) (*model.Instance, error) {
        //TODO implement me
        panic("implement me")
 }
@@ -278,11 +259,11 @@ func (m MockedNacosNamingClient) Subscribe(param 
*vo.SubscribeParam) error {
        return nil
 }
 
-func (m MockedNacosNamingClient) Unsubscribe(param *vo.SubscribeParam) error {
+func (m MockedNacosNamingClient) Unsubscribe(_ *vo.SubscribeParam) error {
        return nil
 }
 
-func (m MockedNacosNamingClient) GetAllServicesInfo(param 
vo.GetAllServiceInfoParam) (model.ServiceList, error) {
+func (m MockedNacosNamingClient) GetAllServicesInfo(_ 
vo.GetAllServiceInfoParam) (model.ServiceList, error) {
        //TODO implement me
        panic("implement me")
 }
@@ -307,7 +288,7 @@ func TestNacosRegistryClient_ListMcpServer(t *testing.T) {
        }
 
        client := NacosRegistryClient{
-               configClient: &MockedNacosConfigClient{configs: mockedConfigs},
+               configClient: MockedNacosConfigClient{configs: mockedConfigs},
        }
 
        servers, err := client.ListMcpServer()
@@ -353,7 +334,7 @@ func TestNacosRegistryClient_ListenToMcpServer(t 
*testing.T) {
        serverConfigKey113 := fmt.Sprintf("%s-%s-mcp-server.json%smcp-server", 
testMcpServerID, testVersion113, configKeySeparator)
        toolsConfigKey113 := fmt.Sprintf("%s-%s-mcp-tools.json%smcp-tools", 
testMcpServerID, testVersion113, configKeySeparator)
 
-       configClient := &MockedNacosConfigClient{
+       configClient := MockedNacosConfigClient{
                configs: map[string]any{
                        versionConfigKey:   
createExploreServerVersionConfig(testVersion112),
                        serverConfigKey112: 
createMcpServerConfig(testMcpServerID, testVersion112, testServiceName),
@@ -385,30 +366,15 @@ func TestNacosRegistryClient_ListenToMcpServer(t 
*testing.T) {
 
        // Set up listener for configuration changes
        var newConfig *McpServerConfig
-       var configMu sync.RWMutex
-
-       // Helper functions to safely access newConfig
-       getConfig := func() *McpServerConfig {
-               configMu.RLock()
-               defer configMu.RUnlock()
-               return newConfig
-       }
-
-       setConfig := func(cfg *McpServerConfig) {
-               configMu.Lock()
-               defer configMu.Unlock()
-               newConfig = cfg
-       }
-
        err = client.ListenToMcpServer(testMcpServerID, func(info 
*McpServerConfig) {
-               setConfig(info)
+               newConfig = info
        })
        if err != nil {
                t.Fatalf("Failed to start listening to MCP server: %v", err)
        }
 
        // Wait for initial configuration to be loaded
-       for i := 0; i < testRetryMaxAttempts && getConfig() == nil; i++ {
+       for i := 0; i < testRetryMaxAttempts && newConfig == nil; i++ {
                time.Sleep(testRetryInterval)
        }
 
@@ -418,21 +384,19 @@ func TestNacosRegistryClient_ListenToMcpServer(t 
*testing.T) {
        // Replace nacos template with processed version
        expectedToolsConfig = strings.ReplaceAll(expectedToolsConfig, 
fmt.Sprintf("${nacos.%s/%s}", testConfigKey, testConfigKey), 
fmt.Sprintf(".config.credentials.%s", testCredentialKey))
 
-       cfg := getConfig()
-       assert.Equal(t, expectedServerConfig, cfg.ServerSpecConfig)
-       assert.Equal(t, expectedToolsConfig, cfg.ToolsSpecConfig)
-       assert.Equal(t, 1, len(cfg.Credentials))
-       assert.Equal(t, map[string]any{"key": testSecretKey}, 
cfg.Credentials[testCredentialKey])
+       assert.Equal(t, expectedServerConfig, newConfig.ServerSpecConfig)
+       assert.Equal(t, expectedToolsConfig, newConfig.ToolsSpecConfig)
+       assert.Equal(t, 1, len(newConfig.Credentials))
+       assert.Equal(t, map[string]any{"key": testSecretKey}, 
newConfig.Credentials[testCredentialKey])
 
        // Test case 1: Change tool nacos template reference
-       listener := configClient.GetListener(toolsConfigKey112, 0)
+       listener := configClient.configListenerMap[toolsConfigKey112][0]
        updatedToolsConfig := createMcpToolsConfig(fmt.Sprintf("%s/%s", 
testConfigKey1, testConfigKey1))
        listener(testNamespace, "mcp-tools", toolsConfigKey112, 
updatedToolsConfig)
 
        // Wait for tools update to propagate
        for i := 0; i < testRetryMaxAttempts; i++ {
-               cfg := getConfig()
-               if cfg != nil && strings.Contains(cfg.ToolsSpecConfig, 
testCredentialKey1) {
+               if newConfig != nil && 
strings.Contains(newConfig.ToolsSpecConfig, testCredentialKey1) {
                        break
                }
                time.Sleep(testRetryInterval)
@@ -440,33 +404,30 @@ func TestNacosRegistryClient_ListenToMcpServer(t 
*testing.T) {
 
        // Verify updated tools configuration
        expectedUpdatedToolsConfig := strings.ReplaceAll(updatedToolsConfig, 
fmt.Sprintf("${nacos.%s/%s}", testConfigKey1, testConfigKey1), 
fmt.Sprintf(".config.credentials.%s", testCredentialKey1))
-       cfg = getConfig()
-       assert.Equal(t, expectedUpdatedToolsConfig, cfg.ToolsSpecConfig)
-       assert.Equal(t, 1, len(cfg.Credentials))
-       assert.Equal(t, map[string]any{"key": testSecretKey1}, 
cfg.Credentials[testCredentialKey1])
+       assert.Equal(t, expectedUpdatedToolsConfig, newConfig.ToolsSpecConfig)
+       assert.Equal(t, 1, len(newConfig.Credentials))
+       assert.Equal(t, map[string]any{"key": testSecretKey1}, 
newConfig.Credentials[testCredentialKey1])
 
        // Test case 2: Change backend service name
-       serviceListener := configClient.GetListener(serverConfigKey112, 0)
+       serviceListener := configClient.configListenerMap[serverConfigKey112][0]
        updatedServerConfig := createMcpServerConfig(testMcpServerID, 
testVersion112, testServiceNameNew)
        serviceListener(testNamespace, "mcp-server", serverConfigKey112, 
updatedServerConfig)
 
        for i := 0; i < testRetryMaxAttempts; i++ {
-               cfg := getConfig()
-               if cfg != nil && strings.Contains(cfg.ServerSpecConfig, 
testServiceNameNew) {
+               if newConfig != nil && 
strings.Contains(newConfig.ServerSpecConfig, testServiceNameNew) {
                        break
                }
                time.Sleep(testRetryInterval)
        }
 
        // Test case 3: Publish new version of MCP server
-       versionListener := configClient.GetListener(versionConfigKey, 0)
+       versionListener := configClient.configListenerMap[versionConfigKey][0]
        updatedVersionConfig := createExploreServerVersionConfig(testVersion113)
        versionListener(testNamespace, testGroupNameMcpVersions, 
versionConfigKey, updatedVersionConfig)
 
        // Wait for version update to trigger server config change
        for i := 0; i < testRetryMaxAttempts; i++ {
-               cfg := getConfig()
-               if cfg != nil && strings.Contains(cfg.ServerSpecConfig, 
fmt.Sprintf("\"version\":\"%s\"", testVersion113)) {
+               if newConfig != nil && 
strings.Contains(newConfig.ServerSpecConfig, fmt.Sprintf("\"version\":\"%s\"", 
testVersion113)) {
                        break
                }
                time.Sleep(testRetryInterval)
@@ -474,8 +435,7 @@ func TestNacosRegistryClient_ListenToMcpServer(t 
*testing.T) {
 
        // Wait for tools config to update to new version reference
        for i := 0; i < testRetryMaxAttempts; i++ {
-               cfg := getConfig()
-               if cfg != nil && strings.Contains(cfg.ToolsSpecConfig, 
testCredentialKey3) {
+               if newConfig != nil && 
strings.Contains(newConfig.ToolsSpecConfig, testCredentialKey3) {
                        break
                }
                time.Sleep(testRetryInterval)
@@ -486,9 +446,8 @@ func TestNacosRegistryClient_ListenToMcpServer(t 
*testing.T) {
        expectedFinalToolsConfig := createMcpToolsConfig(fmt.Sprintf("%s/%s", 
testConfigKey3, testConfigKey3))
        expectedFinalToolsConfig = strings.ReplaceAll(expectedFinalToolsConfig, 
fmt.Sprintf("${nacos.%s/%s}", testConfigKey3, testConfigKey3), 
fmt.Sprintf(".config.credentials.%s", testCredentialKey3))
 
-       cfg = getConfig()
-       assert.Equal(t, expectedFinalServerConfig, cfg.ServerSpecConfig)
-       assert.Equal(t, expectedFinalToolsConfig, cfg.ToolsSpecConfig)
-       assert.Equal(t, 1, len(cfg.Credentials))
-       assert.Equal(t, map[string]any{"key": testSecretKey3}, 
cfg.Credentials[testCredentialKey3])
+       assert.Equal(t, expectedFinalServerConfig, newConfig.ServerSpecConfig)
+       assert.Equal(t, expectedFinalToolsConfig, newConfig.ToolsSpecConfig)
+       assert.Equal(t, 1, len(newConfig.Credentials))
+       assert.Equal(t, map[string]any{"key": testSecretKey3}, 
newConfig.Credentials[testCredentialKey3])
 }
diff --git a/pkg/adapter/mcpserver/registrycenter.go 
b/pkg/adapter/mcpserver/registrycenter.go
index 9cf5326f..6351f72f 100644
--- a/pkg/adapter/mcpserver/registrycenter.go
+++ b/pkg/adapter/mcpserver/registrycenter.go
@@ -167,7 +167,7 @@ func (a *Adapter) Apply() error {
                        }
 
                        // 1) apply tools dynamically to registry for filter 
usage
-                       if dc := mcpserver.GetOrInitDynamic(); dc != nil {
+                       if dc := mcpserver.GetOrInitDynamicConsumer(); dc != 
nil {
                                if err := 
dc.ApplyMcpServerConfigByServer(serverId, cfg); err != nil {
                                        logger.Errorf("[dubbo-go-pixiu] mcp 
adapter apply server %s config error: %v", serverId, err)
                                }
diff --git a/pkg/common/constant/http.go b/pkg/common/constant/http.go
index 287ac3b9..2156ab90 100644
--- a/pkg/common/constant/http.go
+++ b/pkg/common/constant/http.go
@@ -25,6 +25,7 @@ const (
        HeaderKeyContentLength    = "Content-Length"
        HeaderKeyContentEncoding  = "Content-Encoding"
        HeaderKeyUserAgent        = "User-Agent"
+       HeaderKeyAccept           = "Accept"
 
        HeaderKeyAccessControlAllowOrigin      = "Access-Control-Allow-Origin"
        HeaderKeyAccessControlAllowHeaders     = "Access-Control-Allow-Headers"
@@ -43,6 +44,11 @@ const (
        HeaderValueChunked                = "chunked"
        HeaderValueTextPrefix             = "text/"
 
+       // Media type wildcards for Accept header negotiation
+       MediaTypeWildcard        = "*/*"
+       MediaTypeApplicationWild = "application/*"
+       MediaTypeTextWild        = "text/*"
+
        HeaderValueGzip    = "gzip"
        HeaderValueDeflate = "deflate"
 
@@ -94,6 +100,18 @@ const (
        XForwardedProto            = "X-Forwarded-Proto"
 )
 
+// MCP (Model Context Protocol) specific headers
+const (
+       HeaderKeyMCPProtocolVersion = "Mcp-Protocol-Version"
+       HeaderKeyMCPSessionId       = "Mcp-Session-Id"
+)
+
+// MCP protocol versions
+const (
+       MCPProtocolVersion20250618 = "2025-06-18"
+       MCPProtocolVersion20250326 = "2025-03-26"
+)
+
 // SSE response prefixes
 const (
        SSEData  = "data"
diff --git a/pkg/filter/mcp/mcpserver/context.go 
b/pkg/filter/mcp/mcpserver/context.go
index de9877c6..3b7fa6bf 100644
--- a/pkg/filter/mcp/mcpserver/context.go
+++ b/pkg/filter/mcp/mcpserver/context.go
@@ -34,6 +34,14 @@ type MCPData struct {
        Method string
        // RequestID stores JSON-RPC request ID
        RequestID any
+       // SessionID stores MCP session ID for SSE connections
+       SessionID string
+       // AcceptSSE indicates if client accepts text/event-stream
+       AcceptSSE bool
+       // AcceptJSON indicates if client accepts application/json
+       AcceptJSON bool
+       // ProtocolVersion stores MCP protocol version from header
+       ProtocolVersion string
 }
 
 // MCPContext MCP context wrapper that composes HttpContext and provides 
MCP-specific operations
@@ -109,3 +117,101 @@ func NewMCPContextFromHttpContext(httpCtx 
*contexthttp.HttpContext) *MCPContext
 func (ctx *MCPContext) ClearContentLengthHeader() {
        ctx.Writer.Header().Del(constant.HeaderKeyContentLength)
 }
+
+// SessionID management methods
+
+// SetSessionID sets MCP session ID
+func (ctx *MCPContext) SetSessionID(sessionID string) {
+       ctx.mcpData.SessionID = sessionID
+}
+
+// SessionID gets MCP session ID
+func (ctx *MCPContext) SessionID() string {
+       return ctx.mcpData.SessionID
+}
+
+// HasSession checks if context has a session ID
+func (ctx *MCPContext) HasSession() bool {
+       return ctx.mcpData.SessionID != ""
+}
+
+// Accept header management methods
+
+// SetAcceptSSE sets if client accepts text/event-stream
+func (ctx *MCPContext) SetAcceptSSE(acceptSSE bool) {
+       ctx.mcpData.AcceptSSE = acceptSSE
+}
+
+// AcceptSSE gets if client accepts text/event-stream
+func (ctx *MCPContext) AcceptSSE() bool {
+       return ctx.mcpData.AcceptSSE
+}
+
+// SetAcceptJSON sets if client accepts application/json
+func (ctx *MCPContext) SetAcceptJSON(acceptJSON bool) {
+       ctx.mcpData.AcceptJSON = acceptJSON
+}
+
+// AcceptJSON gets if client accepts application/json
+func (ctx *MCPContext) AcceptJSON() bool {
+       return ctx.mcpData.AcceptJSON
+}
+
+// Protocol version management methods
+
+// SetProtocolVersion sets MCP protocol version
+func (ctx *MCPContext) SetProtocolVersion(version string) {
+       ctx.mcpData.ProtocolVersion = version
+}
+
+// ProtocolVersion gets MCP protocol version
+func (ctx *MCPContext) ProtocolVersion() string {
+       return ctx.mcpData.ProtocolVersion
+}
+
+// ParseAndSetAcceptHeader parses Accept header and sets AcceptSSE/AcceptJSON 
flags
+func (ctx *MCPContext) ParseAndSetAcceptHeader() {
+       acceptHeader := ctx.Request.Header.Get(constant.HeaderKeyAccept)
+       ctx.mcpData.AcceptJSON = acceptHeader == "" || // Default to JSON for 
backward compatibility
+               containsMediaType(acceptHeader, 
constant.HeaderValueApplicationJson) ||
+               containsMediaType(acceptHeader, 
constant.MediaTypeApplicationWild) ||
+               containsMediaType(acceptHeader, constant.MediaTypeWildcard)
+
+       ctx.mcpData.AcceptSSE = containsMediaType(acceptHeader, 
constant.HeaderValueTextEventStream) ||
+               containsMediaType(acceptHeader, constant.MediaTypeTextWild) ||
+               containsMediaType(acceptHeader, constant.MediaTypeWildcard)
+}
+
+// ParseAndSetSessionHeader parses Mcp-Session-Id header and sets session ID
+func (ctx *MCPContext) ParseAndSetSessionHeader() {
+       sessionID := ctx.Request.Header.Get(constant.HeaderKeyMCPSessionId)
+       ctx.mcpData.SessionID = sessionID
+}
+
+// ParseAndSetProtocolVersionHeader parses MCP-Protocol-Version header
+func (ctx *MCPContext) ParseAndSetProtocolVersionHeader() {
+       version := ctx.Request.Header.Get(constant.HeaderKeyMCPProtocolVersion)
+       ctx.mcpData.ProtocolVersion = version
+}
+
+// containsMediaType checks if the Accept header contains the specified media 
type
+func containsMediaType(acceptHeader, mediaType string) bool {
+       if acceptHeader == "" {
+               return false
+       }
+       // Simple contains check - could be enhanced with proper media type 
parsing
+       return len(acceptHeader) > 0 && (acceptHeader == mediaType ||
+               len(acceptHeader) >= len(mediaType) && 
(acceptHeader[:len(mediaType)] == mediaType ||
+                       acceptHeader[len(acceptHeader)-len(mediaType):] == 
mediaType ||
+                       containsSubstring(acceptHeader, mediaType)))
+}
+
+// containsSubstring is a helper function for media type checking
+func containsSubstring(s, substr string) bool {
+       for i := 0; i <= len(s)-len(substr); i++ {
+               if s[i:i+len(substr)] == substr {
+                       return true
+               }
+       }
+       return false
+}
diff --git a/pkg/filter/mcp/mcpserver/dynamic.go 
b/pkg/filter/mcp/mcpserver/dynamic.go
index a2b1c718..737fe6c4 100644
--- a/pkg/filter/mcp/mcpserver/dynamic.go
+++ b/pkg/filter/mcp/mcpserver/dynamic.go
@@ -20,6 +20,7 @@ package mcpserver
 import (
        "crypto/sha256"
        "encoding/hex"
+       "encoding/json"
        "fmt"
        "sort"
        "sync"
@@ -27,6 +28,7 @@ import (
 )
 
 import (
+       "github.com/apache/dubbo-go-pixiu/pkg/filter/mcp/mcpserver/transport"
        "github.com/apache/dubbo-go-pixiu/pkg/logger"
        "github.com/apache/dubbo-go-pixiu/pkg/model"
 )
@@ -39,15 +41,6 @@ const (
        EmptyFingerprint = "00000000"
 )
 
-var (
-       globalRegistry *ToolRegistry
-       globalDynamic  *DynamicConsumer
-
-       // sync.Once variables for thread-safe singleton initialization
-       registryOnce sync.Once
-       dynamicOnce  sync.Once
-)
-
 // ServerToolConfig tool configuration for a single server
 type ServerToolConfig struct {
        Tools       []model.ToolConfig
@@ -55,25 +48,11 @@ type ServerToolConfig struct {
        LastApplied time.Time
 }
 
-// GetOrInitRegistry returns a singleton ToolRegistry
-func GetOrInitRegistry() *ToolRegistry {
-       registryOnce.Do(func() {
-               globalRegistry = NewToolRegistry()
-       })
-       return globalRegistry
-}
-
-// GetOrInitDynamic returns a singleton DynamicConsumer
-func GetOrInitDynamic() *DynamicConsumer {
-       dynamicOnce.Do(func() {
-               globalDynamic = NewDynamicConsumer(GetOrInitRegistry())
-       })
-       return globalDynamic
-}
-
 // DynamicConsumer applies dynamic MCP configurations into the registry
 type DynamicConsumer struct {
-       registry *ToolRegistry
+       registry       *ToolRegistry
+       sessionManager *transport.SessionManager
+       sseHandler     *transport.SSEHandler
 
        // Tool configuration management grouped by server
        mu            sync.RWMutex
@@ -81,11 +60,13 @@ type DynamicConsumer struct {
        debounceTime  time.Duration
 }
 
-func NewDynamicConsumer(reg *ToolRegistry) *DynamicConsumer {
+func NewDynamicConsumer(reg *ToolRegistry, sm *transport.SessionManager, 
sseHandler *transport.SSEHandler) *DynamicConsumer {
        return &DynamicConsumer{
-               registry:      reg,
-               serverConfigs: make(map[string]*ServerToolConfig),
-               debounceTime:  DefaultDebounceTime,
+               registry:       reg,
+               sessionManager: sm,
+               sseHandler:     sseHandler,
+               serverConfigs:  make(map[string]*ServerToolConfig),
+               debounceTime:   DefaultDebounceTime,
        }
 }
 
@@ -144,6 +125,9 @@ func (d *DynamicConsumer) 
ApplyMcpServerConfigByServer(serverId string, cfg *mod
        logger.Infof("[dubbo-go-pixiu] mcp server %s config applied: %d tools, 
total servers: %d, merged tools: %d",
                serverId, len(cfg.Tools), len(d.serverConfigs), 
len(mergedTools))
 
+       // Notify all connected clients about tools list change
+       d.notifyToolsListChanged()
+
        return nil
 }
 
@@ -166,7 +150,8 @@ func (d *DynamicConsumer) calculateFingerprint(tools 
[]model.ToolConfig) string
        // Build hash input string
        hash := sha256.New()
        for _, tool := range sortedTools {
-               fmt.Fprintf(hash, "name:%s;cluster:%s;args:%d;", tool.Name, 
tool.Cluster, len(tool.Args))
+               _, _ = fmt.Fprintf(hash, "name:%s;cluster:%s;args:%d;", 
tool.Name, tool.Cluster, len(tool.Args))
+
        }
 
        // Return first 8 characters of hex encoded hash
@@ -246,3 +231,63 @@ func (d *DynamicConsumer) calculateCurrentMergedTools() 
[]model.ToolConfig {
 
        return allTools
 }
+
+// notifyToolsListChanged sends notifications/tools/list_changed to all 
connected clients
+func (d *DynamicConsumer) notifyToolsListChanged() {
+       if d.sessionManager == nil {
+               logger.Debugf("[dubbo-go-pixiu] mcp server session manager not 
available, skip tools list_changed notification")
+               return
+       }
+
+       // Get all active sessions
+       sessionIDs := d.sessionManager.AllSessionIDs()
+       if len(sessionIDs) == 0 {
+               logger.Debugf("[dubbo-go-pixiu] mcp server no active sessions, 
skip tools list_changed notification")
+               return
+       }
+
+       // Send notification to each session
+       successCount := 0
+       for _, sessionID := range sessionIDs {
+               if err := d.sendToolsListChangedNotification(sessionID); err != 
nil {
+                       logger.Warnf("[dubbo-go-pixiu] mcp server failed to 
send tools list_changed to session %s: %v", sessionID, err)
+               } else {
+                       successCount++
+               }
+       }
+
+       logger.Infof("[dubbo-go-pixiu] mcp server sent tools/list_changed 
notification to %d/%d sessions", successCount, len(sessionIDs))
+}
+
+// sendToolsListChangedNotification sends notification to a specific session
+func (d *DynamicConsumer) sendToolsListChangedNotification(sessionID string) 
error {
+       session, exists := d.sessionManager.Session(sessionID)
+       if !exists {
+               return fmt.Errorf("session not found")
+       }
+
+       if session.PipeWriter == nil {
+               return fmt.Errorf("SSE pipe not established")
+       }
+
+       // Build tools/list_changed notification (no params needed)
+       notification := map[string]any{
+               "jsonrpc": "2.0",
+               "method":  "notifications/tools/list_changed",
+       }
+
+       messageJSON, err := json.Marshal(notification)
+       if err != nil {
+               return fmt.Errorf("failed to marshal notification: %w", err)
+       }
+
+       sseData := d.sseHandler.FormatSSEMessage(string(messageJSON))
+
+       if _, err := session.PipeWriter.Write([]byte(sseData)); err != nil {
+               return fmt.Errorf("failed to write to SSE pipe: %w", err)
+       }
+
+       session.LastActivity = time.Now()
+       logger.Debugf("[dubbo-go-pixiu] mcp server sent tools/list_changed to 
session: %s", sessionID)
+       return nil
+}
diff --git a/pkg/filter/mcp/mcpserver/dynamic_notification_test.go 
b/pkg/filter/mcp/mcpserver/dynamic_notification_test.go
new file mode 100644
index 00000000..a74592d9
--- /dev/null
+++ b/pkg/filter/mcp/mcpserver/dynamic_notification_test.go
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mcpserver
+
+import (
+       "io"
+       "strings"
+       "testing"
+       "time"
+)
+
+import (
+       "github.com/stretchr/testify/assert"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/model"
+)
+
+func TestNotifyToolsListChanged(t *testing.T) {
+       // Reset global state
+       ResetGlobalState()
+       defer ResetGlobalState()
+
+       consumer := GetOrInitDynamicConsumer()
+       sm := GetOrInitSessionManager()
+
+       // Create session with SSE pipe
+       session, _ := sm.EnsureSession("")
+       pipeReader, pipeWriter := io.Pipe()
+       session.PipeWriter = pipeWriter
+       defer pipeReader.Close()
+       defer pipeWriter.Close()
+
+       // Apply config with tools (will trigger notification)
+       config := createTestMcpServerConfig([]model.ToolConfig{
+               {Name: "tool1", Cluster: "cluster1"},
+       })
+
+       // Read notification in background
+       notificationCh := make(chan string, 1)
+       go func() {
+               buf := make([]byte, 1024)
+               n, err := pipeReader.Read(buf)
+               if err == nil && n > 0 {
+                       notificationCh <- string(buf[:n])
+               }
+       }()
+
+       // Apply config
+       consumer.ResetDebounceState()
+       err := consumer.ApplyMcpServerConfigByServer("server1", config)
+       assert.NoError(t, err)
+
+       // Wait for notification
+       select {
+       case notification := <-notificationCh:
+               // Verify notification format
+               assert.True(t, strings.HasPrefix(notification, "data:"), 
"Should start with 'data:'")
+               assert.True(t, strings.HasSuffix(notification, "\n\n"), "Should 
end with \\n\\n")
+               assert.True(t, strings.Contains(notification, 
"notifications/tools/list_changed"), "Should contain method name")
+               assert.True(t, strings.Contains(notification, "jsonrpc"), 
"Should contain jsonrpc field")
+       case <-time.After(1 * time.Second):
+               t.Fatal("Did not receive notification within timeout")
+       }
+}
+
+func TestNotifyToolsListChanged_MultipleSessions(t *testing.T) {
+       ResetGlobalState()
+       defer ResetGlobalState()
+
+       consumer := GetOrInitDynamicConsumer()
+       sm := GetOrInitSessionManager()
+
+       // Create multiple sessions with pipes
+       numSessions := 3
+       readers := make([]io.ReadCloser, numSessions)
+       writers := make([]*io.PipeWriter, numSessions)
+       notificationChs := make([]chan string, numSessions)
+
+       for i := 0; i < numSessions; i++ {
+               session, _ := sm.EnsureSession("")
+               pipeReader, pipeWriter := io.Pipe()
+               session.PipeWriter = pipeWriter
+               readers[i] = pipeReader
+               writers[i] = pipeWriter
+               defer pipeReader.Close()
+               defer pipeWriter.Close()
+
+               // Start reader for each session
+               notificationChs[i] = make(chan string, 1)
+               go func(idx int, reader io.Reader, ch chan string) {
+                       buf := make([]byte, 1024)
+                       n, err := reader.Read(buf)
+                       if err == nil && n > 0 {
+                               ch <- string(buf[:n])
+                       }
+               }(i, pipeReader, notificationChs[i])
+       }
+
+       // Apply config
+       config := createTestMcpServerConfig([]model.ToolConfig{
+               {Name: "tool1", Cluster: "cluster1"},
+       })
+
+       consumer.ResetDebounceState()
+       err := consumer.ApplyMcpServerConfigByServer("server1", config)
+       assert.NoError(t, err)
+
+       // Verify all sessions received notification
+       receivedCount := 0
+       for i := 0; i < numSessions; i++ {
+               select {
+               case notification := <-notificationChs[i]:
+                       assert.Contains(t, notification, 
"notifications/tools/list_changed")
+                       receivedCount++
+               case <-time.After(1 * time.Second):
+                       t.Logf("Session %d did not receive notification", i)
+               }
+       }
+
+       assert.Equal(t, numSessions, receivedCount, "All sessions should 
receive notification")
+}
+
+func TestNotifyToolsListChanged_NoActiveSessions(t *testing.T) {
+       ResetGlobalState()
+       defer ResetGlobalState()
+
+       consumer := GetOrInitDynamicConsumer()
+
+       // Apply config without any active sessions
+       config := createTestMcpServerConfig([]model.ToolConfig{
+               {Name: "tool1", Cluster: "cluster1"},
+       })
+
+       consumer.ResetDebounceState()
+       err := consumer.ApplyMcpServerConfigByServer("server1", config)
+       assert.NoError(t, err)
+
+       // Should not panic or error, just skip notification
+       assert.Equal(t, 0, GetOrInitSessionManager().ActiveSessionCount())
+}
+
+func TestNotifyToolsListChanged_DisconnectedSession(t *testing.T) {
+       ResetGlobalState()
+       defer ResetGlobalState()
+
+       consumer := GetOrInitDynamicConsumer()
+       sm := GetOrInitSessionManager()
+
+       // Create session but don't attach pipe
+       _, _ = sm.EnsureSession("")
+       // session.PipeWriter is nil
+
+       // Apply config
+       config := createTestMcpServerConfig([]model.ToolConfig{
+               {Name: "tool1", Cluster: "cluster1"},
+       })
+
+       consumer.ResetDebounceState()
+       err := consumer.ApplyMcpServerConfigByServer("server1", config)
+       assert.NoError(t, err)
+
+       // Should handle disconnected session gracefully (logged as warning)
+       // No panic or error
+}
diff --git a/pkg/filter/mcp/mcpserver/dynamic_test.go 
b/pkg/filter/mcp/mcpserver/dynamic_test.go
index 1e95b67c..e221402d 100644
--- a/pkg/filter/mcp/mcpserver/dynamic_test.go
+++ b/pkg/filter/mcp/mcpserver/dynamic_test.go
@@ -29,20 +29,13 @@ import (
 )
 
 import (
+       "github.com/apache/dubbo-go-pixiu/pkg/filter/mcp/mcpserver/transport"
        "github.com/apache/dubbo-go-pixiu/pkg/model"
 )
 
 // 
=============================================================================
 // Test Utilities
 // 
=============================================================================
-// resetSingletons resets singleton state for testing
-func resetSingletons() {
-       globalRegistry = nil
-       globalDynamic = nil
-       registryOnce = sync.Once{}
-       dynamicOnce = sync.Once{}
-}
-
 // createTestToolConfig creates a simple test tool configuration
 func createTestToolConfig(name, description string) model.ToolConfig {
        return model.ToolConfig{
@@ -83,7 +76,7 @@ func createTestMcpServerConfig(tools []model.ToolConfig) 
*model.McpServerConfig
 // 
=============================================================================
 
 func TestSingletonInstances(t *testing.T) {
-       resetSingletons()
+       ResetGlobalState()
 
        t.Run("Registry singleton", func(t *testing.T) {
                registry1 := GetOrInitRegistry()
@@ -92,15 +85,15 @@ func TestSingletonInstances(t *testing.T) {
        })
 
        t.Run("Dynamic consumer singleton", func(t *testing.T) {
-               dynamic1 := GetOrInitDynamic()
-               dynamic2 := GetOrInitDynamic()
+               dynamic1 := GetOrInitDynamicConsumer()
+               dynamic2 := GetOrInitDynamicConsumer()
                assert.Same(t, dynamic1, dynamic2)
                assert.Same(t, dynamic1.registry, GetOrInitRegistry())
        })
 }
 
 func TestSingletonConcurrency(t *testing.T) {
-       resetSingletons()
+       ResetGlobalState()
 
        const numGoroutines = 50
        var wg sync.WaitGroup
@@ -130,7 +123,10 @@ func TestSingletonConcurrency(t *testing.T) {
 func TestApplyMcpServerConfig(t *testing.T) {
        t.Run("Basic configuration application", func(t *testing.T) {
                registry := NewToolRegistry()
-               consumer := NewDynamicConsumer(registry)
+               sm := transport.NewSessionManager()
+               defer sm.Stop()
+               sseHandler := transport.NewSSEHandler(sm)
+               consumer := NewDynamicConsumer(registry, sm, sseHandler)
 
                // Test nil config
                err := consumer.ApplyMcpServerConfigByServer("default", nil)
@@ -160,7 +156,10 @@ func TestApplyMcpServerConfig(t *testing.T) {
 
        t.Run("Configuration replacement", func(t *testing.T) {
                registry := NewToolRegistry()
-               consumer := NewDynamicConsumer(registry)
+               sm := transport.NewSessionManager()
+               defer sm.Stop()
+               sseHandler := transport.NewSSEHandler(sm)
+               consumer := NewDynamicConsumer(registry, sm, sseHandler)
 
                // Apply first config
                config1 := createTestMcpServerConfig([]model.ToolConfig{
@@ -187,9 +186,9 @@ func TestApplyMcpServerConfig(t *testing.T) {
 }
 
 func TestApplyMcpServerConfigConcurrent(t *testing.T) {
-       resetSingletons()
+       ResetGlobalState()
        registry := GetOrInitRegistry()
-       consumer := GetOrInitDynamic()
+       consumer := GetOrInitDynamicConsumer()
 
        const numGoroutines = 10
        var wg sync.WaitGroup
@@ -222,7 +221,10 @@ func TestApplyMcpServerConfigConcurrent(t *testing.T) {
 func TestDebounceFeatures(t *testing.T) {
        t.Run("Content debounce - skip identical configs", func(t *testing.T) {
                registry := NewToolRegistry()
-               consumer := NewDynamicConsumer(registry)
+               sm := transport.NewSessionManager()
+               defer sm.Stop()
+               sseHandler := transport.NewSSEHandler(sm)
+               consumer := NewDynamicConsumer(registry, sm, sseHandler)
 
                config := createTestMcpServerConfig([]model.ToolConfig{
                        createTestToolConfig("tool1", "Test tool"),
@@ -245,7 +247,10 @@ func TestDebounceFeatures(t *testing.T) {
 
        t.Run("Time debounce - skip rapid calls", func(t *testing.T) {
                registry := NewToolRegistry()
-               consumer := NewDynamicConsumer(registry)
+               sm := transport.NewSessionManager()
+               defer sm.Stop()
+               sseHandler := transport.NewSSEHandler(sm)
+               consumer := NewDynamicConsumer(registry, sm, sseHandler)
 
                config1 := createTestMcpServerConfig([]model.ToolConfig{
                        createTestToolConfig("tool1", "First tool"),
@@ -271,7 +276,10 @@ func TestDebounceFeatures(t *testing.T) {
 
        t.Run("Empty configuration handling", func(t *testing.T) {
                registry := NewToolRegistry()
-               consumer := NewDynamicConsumer(registry)
+               sm := transport.NewSessionManager()
+               defer sm.Stop()
+               sseHandler := transport.NewSSEHandler(sm)
+               consumer := NewDynamicConsumer(registry, sm, sseHandler)
 
                // Add tool first
                config := createTestMcpServerConfig([]model.ToolConfig{
@@ -296,7 +304,10 @@ func TestDebounceFeatures(t *testing.T) {
 
 func TestDebounceConfiguration(t *testing.T) {
        registry := NewToolRegistry()
-       consumer := NewDynamicConsumer(registry)
+       sm := transport.NewSessionManager()
+       defer sm.Stop()
+       sseHandler := transport.NewSSEHandler(sm)
+       consumer := NewDynamicConsumer(registry, sm, sseHandler)
 
        // Test default debounce time
        info := consumer.GetDebounceInfo()
@@ -329,7 +340,10 @@ func TestDebounceConfiguration(t *testing.T) {
 
 func TestFingerprintCalculation(t *testing.T) {
        registry := NewToolRegistry()
-       consumer := NewDynamicConsumer(registry)
+       sm := transport.NewSessionManager()
+       defer sm.Stop()
+       sseHandler := transport.NewSSEHandler(sm)
+       consumer := NewDynamicConsumer(registry, sm, sseHandler)
 
        // Empty tools
        fingerprint1 := consumer.calculateFingerprint([]model.ToolConfig{})
@@ -362,10 +376,10 @@ func TestFingerprintCalculation(t *testing.T) {
 // 
=============================================================================
 
 func TestIntegration(t *testing.T) {
-       resetSingletons()
+       ResetGlobalState()
 
        registry := GetOrInitRegistry()
-       consumer := GetOrInitDynamic()
+       consumer := GetOrInitDynamicConsumer()
 
        // Verify initial state
        assert.Empty(t, registry.ListTools())
@@ -402,7 +416,10 @@ func TestIntegration(t *testing.T) {
 
 func BenchmarkApplyMcpServerConfig(b *testing.B) {
        registry := NewToolRegistry()
-       consumer := NewDynamicConsumer(registry)
+       sm := transport.NewSessionManager()
+       defer sm.Stop()
+       sseHandler := transport.NewSSEHandler(sm)
+       consumer := NewDynamicConsumer(registry, sm, sseHandler)
 
        config := createTestMcpServerConfig([]model.ToolConfig{
                createTestToolConfig("tool1", "First tool"),
diff --git a/pkg/filter/mcp/mcpserver/filter.go 
b/pkg/filter/mcp/mcpserver/filter.go
index 1ca5908e..1a56cb75 100644
--- a/pkg/filter/mcp/mcpserver/filter.go
+++ b/pkg/filter/mcp/mcpserver/filter.go
@@ -22,6 +22,7 @@ import (
        "fmt"
        "io"
        "net/http"
+       "time"
 )
 
 import (
@@ -29,8 +30,10 @@ import (
 )
 
 import (
+       "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
        "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
        contexthttp "github.com/apache/dubbo-go-pixiu/pkg/context/http"
+       "github.com/apache/dubbo-go-pixiu/pkg/filter/mcp/mcpserver/transport"
        "github.com/apache/dubbo-go-pixiu/pkg/logger"
        "github.com/apache/dubbo-go-pixiu/pkg/model"
 )
@@ -45,10 +48,13 @@ type (
 
        // MCPServerFilter is a filter that handles MCP protocol.
        MCPServerFilter struct {
-               cfg             *model.McpServerConfig
-               registry        *ToolRegistry
-               errorHandler    *ErrorHandler
-               responseBuilder *ResponseBuilder
+               cfg               *model.McpServerConfig
+               registry          *ToolRegistry
+               errorHandler      *ErrorHandler
+               responseBuilder   *ResponseBuilder
+               sessionManager    *transport.SessionManager
+               sseHandler        *transport.SSEHandler
+               contentNegotiator *transport.ContentNegotiator
        }
 )
 
@@ -97,11 +103,19 @@ func (f *FilterFactory) Config() any {
 
 // PrepareFilterChain prepares the filter chain
 func (f *FilterFactory) PrepareFilterChain(_ *contexthttp.HttpContext, chain 
filter.FilterChain) error {
+       // Get global session manager singleton
+       sessionManager := GetOrInitSessionManager()
+       sseHandler := transport.NewSSEHandler(sessionManager)
+       contentNegotiator := transport.NewContentNegotiator()
+
        mcpFilter := &MCPServerFilter{
-               cfg:             f.cfg,
-               registry:        f.registry,
-               errorHandler:    NewErrorHandler(),
-               responseBuilder: NewResponseBuilder(),
+               cfg:               f.cfg,
+               registry:          f.registry,
+               errorHandler:      NewErrorHandler(),
+               responseBuilder:   NewResponseBuilder(),
+               sessionManager:    sessionManager,
+               sseHandler:        sseHandler,
+               contentNegotiator: contentNegotiator,
        }
        chain.AppendDecodeFilters(mcpFilter)
        chain.AppendEncodeFilters(mcpFilter) // Add to Encode chain
@@ -118,36 +132,25 @@ func (f *MCPServerFilter) Decode(ctx 
*contexthttp.HttpContext) filter.FilterStat
        // Create MCP context wrapper
        mcpCtx := NewMCPContext(ctx)
 
-       // Read request body
-       body, err := io.ReadAll(ctx.Request.Body)
-       if err != nil {
-               logger.Errorf("[dubbo-go-pixiu] mcp server failed to read 
request body: %v", err)
-               return f.errorHandler.SendInternalError(mcpCtx, nil, "failed to 
read request body")
-       }
+       // Parse HTTP headers
+       mcpCtx.ParseAndSetProtocolVersionHeader()
+       mcpCtx.ParseAndSetSessionHeader()
+       mcpCtx.ParseAndSetAcceptHeader()
 
-       // Parse JSON-RPC request
-       var jsonrpcReq mcp.JSONRPCRequest
-       if err := json.Unmarshal(body, &jsonrpcReq); err != nil {
-               logger.Errorf("[dubbo-go-pixiu] mcp server failed to parse 
JSON-RPC request: %v", err)
-               return f.errorHandler.SendInternalError(mcpCtx, nil, "invalid 
JSON-RPC request")
+       // Protocol version is logged for debugging
+       version := mcpCtx.ProtocolVersion()
+       if version != "" {
+               logger.Debugf("[dubbo-go-pixiu] mcp server client protocol 
version: %s", version)
        }
 
-       logger.Infof("[dubbo-go-pixiu] mcp server received request: %s (id: 
%v)", jsonrpcReq.Method, jsonrpcReq.ID)
-
-       // Store information in MCP context
-       mcpCtx.SetMCPMethod(jsonrpcReq.Method)
-       mcpCtx.SetMCPRequestID(jsonrpcReq.ID)
-
-       // Handle terminal methods (methods that don't need forwarding to 
backend)
-       if f.isTerminalMethod(jsonrpcReq.Method) {
-               return f.handleTerminalMethod(mcpCtx, jsonrpcReq)
-       } else if jsonrpcReq.Method == string(mcp.MethodToolsCall) {
-               // Tool call will be processed in Encode stage (IsMCPToolCall() 
checks method)
-               return f.handleToolCall(mcpCtx, jsonrpcReq)
-       } else {
-               // Unknown method
-               logger.Warnf("[dubbo-go-pixiu] mcp server unsupported method: 
%s", jsonrpcReq.Method)
-               return f.errorHandler.SendMethodNotFound(mcpCtx, jsonrpcReq.ID)
+       // Dispatch based on HTTP method
+       switch ctx.Request.Method {
+       case constant.Get:
+               return f.handleGetRequest(mcpCtx)
+       case constant.Post:
+               return f.handlePostRequest(mcpCtx)
+       default:
+               return f.sendMethodNotAllowed(mcpCtx)
        }
 }
 
@@ -163,33 +166,6 @@ func (f *MCPServerFilter) isTerminalMethod(method string) 
bool {
        }
 }
 
-// handleTerminalMethod handles terminal methods
-func (f *MCPServerFilter) handleTerminalMethod(ctx *MCPContext, req 
mcp.JSONRPCRequest) filter.FilterStatus {
-       switch req.Method {
-       case string(mcp.MethodInitialize):
-               return f.handleInitialize(ctx, req)
-       case string(mcp.MethodToolsList):
-               return f.handleToolsList(ctx, req)
-       case string(mcp.MethodResourcesList):
-               return f.handleResourcesList(ctx, req)
-       case string(mcp.MethodResourcesRead):
-               return f.handleResourceRead(ctx, req)
-       case "resources/templates/list":
-               return f.handleResourceTemplatesList(ctx, req)
-       case string(mcp.MethodPromptsList):
-               return f.handlePromptsList(ctx, req)
-       case string(mcp.MethodPromptsGet):
-               return f.handlePromptsGet(ctx, req)
-       case "notifications/initialized":
-               return f.handleNotificationsInitialized(ctx, req)
-       case string(mcp.MethodPing):
-               return f.handlePing(ctx, req)
-       default:
-               logger.Warnf("[dubbo-go-pixiu] mcp server unsupported method: 
%s", req.Method)
-               return f.errorHandler.SendMethodNotFound(ctx, req.ID)
-       }
-}
-
 // Encode processes outgoing HTTP responses.
 func (f *MCPServerFilter) Encode(ctx *contexthttp.HttpContext) 
filter.FilterStatus {
        // Create MCP context wrapper and load stored MCP data
@@ -235,3 +211,327 @@ func (f *MCPServerFilter) sendJSONResponse(ctx 
*MCPContext, response any) filter
        ctx.SendLocalReply(http.StatusOK, responseBody)
        return filter.Stop
 }
+
+// handleGetRequest handles HTTP GET requests for SSE stream establishment
+func (f *MCPServerFilter) handleGetRequest(ctx *MCPContext) 
filter.FilterStatus {
+       logger.Infof("[dubbo-go-pixiu] mcp server handling GET request for SSE 
stream")
+
+       // Validate Accept header includes text/event-stream
+       if !ctx.AcceptSSE() {
+               logger.Warnf("[dubbo-go-pixiu] mcp server GET request must 
accept text/event-stream")
+               return f.sendNotAcceptable(ctx, "GET request must accept 
text/event-stream")
+       }
+
+       // Get or create session
+       sessionIDHeader := ctx.SessionID()
+       session, isNewSession := f.sessionManager.EnsureSession(sessionIDHeader)
+       ctx.SetSessionID(session.ID)
+
+       // Create io.Pipe for SSE message transport
+       pipeReader, pipeWriter := io.Pipe()
+       session.PipeWriter = pipeWriter
+
+       // Create virtual HTTP response with pipe as body
+       virtualResp := &http.Response{
+               StatusCode: http.StatusOK,
+               Header: http.Header{
+                       constant.HeaderKeyContextType:              
[]string{constant.HeaderValueTextEventStream},
+                       constant.HeaderKeyCacheControl:             
[]string{constant.HeaderValueNoCache},
+                       constant.HeaderKeyConnection:               
[]string{constant.HeaderValueKeepAlive},
+                       constant.HeaderKeyMCPSessionId:             
[]string{session.ID},
+                       constant.HeaderKeyAccessControlAllowOrigin: 
[]string{constant.HeaderValueAll},
+               },
+               Body: pipeReader,
+       }
+
+       // Set SourceResp to let buildTargetResponse convert to StreamResponse
+       ctx.SourceResp = virtualResp
+       ctx.StatusCode(http.StatusOK)
+
+       // Start background goroutine to maintain the SSE connection
+       go f.maintainSSEPipe(ctx, session)
+
+       if isNewSession {
+               logger.Infof("[dubbo-go-pixiu] mcp server established new SSE 
stream for session: %s", session.ID)
+       } else {
+               logger.Infof("[dubbo-go-pixiu] mcp server resumed SSE stream 
for existing session: %s", session.ID)
+       }
+
+       // Return Stop to skip remaining filters and backend call
+       return filter.Stop
+}
+
+// handlePostRequest handles HTTP POST requests with JSON-RPC messages
+func (f *MCPServerFilter) handlePostRequest(ctx *MCPContext) 
filter.FilterStatus {
+       logger.Debugf("[dubbo-go-pixiu] mcp server handling POST request")
+
+       // Read request body
+       body, err := io.ReadAll(ctx.Request.Body)
+       if err != nil {
+               logger.Errorf("[dubbo-go-pixiu] mcp server failed to read 
request body: %v", err)
+               return f.errorHandler.SendInternalError(ctx, nil, "failed to 
read request body")
+       }
+
+       // Parse JSON-RPC request
+       var jsonrpcReq mcp.JSONRPCRequest
+       if err := json.Unmarshal(body, &jsonrpcReq); err != nil {
+               logger.Errorf("[dubbo-go-pixiu] mcp server failed to parse 
JSON-RPC request: %v", err)
+               return f.errorHandler.SendInternalError(ctx, nil, "invalid 
JSON-RPC request")
+       }
+
+       logger.Infof("[dubbo-go-pixiu] mcp server received POST request: %s 
(id: %v)", jsonrpcReq.Method, jsonrpcReq.ID)
+
+       // Store information in MCP context
+       ctx.SetMCPMethod(jsonrpcReq.Method)
+       ctx.SetMCPRequestID(jsonrpcReq.ID)
+
+       // Determine response format based on content negotiation
+       sessionID := ctx.SessionID()
+       hasSession := sessionID != "" && f.sessionExists(sessionID)
+       responseFormat := f.contentNegotiator.NegotiateResponse(
+               ctx.Request.Header.Get(constant.HeaderKeyAccept), hasSession)
+
+       // Store response format decision in context for later use
+       ctx.StoreMCPDataInParams()
+
+       // Handle different request types
+       if f.isTerminalMethod(jsonrpcReq.Method) {
+               return f.handleTerminalMethodWithNegotiation(ctx, jsonrpcReq, 
responseFormat)
+       } else if jsonrpcReq.Method == string(mcp.MethodToolsCall) {
+               return f.handleToolCallWithNegotiation(ctx, jsonrpcReq, 
responseFormat)
+       } else {
+               // Unknown method
+               logger.Warnf("[dubbo-go-pixiu] mcp server unsupported method: 
%s", jsonrpcReq.Method)
+               return f.errorHandler.SendMethodNotFound(ctx, jsonrpcReq.ID)
+       }
+}
+
+// handleTerminalMethodWithNegotiation handles terminal methods with response 
format negotiation
+func (f *MCPServerFilter) handleTerminalMethodWithNegotiation(ctx *MCPContext, 
req mcp.JSONRPCRequest, responseFormat transport.ResponseFormat) 
filter.FilterStatus {
+       // Special case 1: initialize always returns JSON immediately (no SSE 
option per MCP spec)
+       if req.Method == string(mcp.MethodInitialize) {
+               return f.handleInitialize(ctx, req)
+       }
+
+       // Special case 2: notifications/initialized always returns 202 
Accepted with no body
+       if req.Method == "notifications/initialized" {
+               logger.Infof("[dubbo-go-pixiu] mcp server received initialized 
notification, returning 202 Accepted")
+               ctx.SendLocalReply(http.StatusAccepted, nil)
+               return filter.Stop
+       }
+
+       // For other terminal methods, dispatch to specific handlers with 
response format
+       switch req.Method {
+       case string(mcp.MethodToolsList):
+               return f.handleToolsList(ctx, req, responseFormat)
+       case string(mcp.MethodResourcesList):
+               return f.handleResourcesList(ctx, req, responseFormat)
+       case string(mcp.MethodResourcesRead):
+               return f.handleResourceRead(ctx, req, responseFormat)
+       case "resources/templates/list":
+               return f.handleResourceTemplatesList(ctx, req, responseFormat)
+       case string(mcp.MethodPromptsList):
+               return f.handlePromptsList(ctx, req, responseFormat)
+       case string(mcp.MethodPromptsGet):
+               return f.handlePromptsGet(ctx, req, responseFormat)
+       case string(mcp.MethodPing):
+               return f.handlePing(ctx, req, responseFormat)
+       default:
+               logger.Warnf("[dubbo-go-pixiu] mcp server unsupported terminal 
method: %s", req.Method)
+               return f.errorHandler.SendMethodNotFound(ctx, req.ID)
+       }
+}
+
+// handleToolCallWithNegotiation handles tool calls with response format 
negotiation
+func (f *MCPServerFilter) handleToolCallWithNegotiation(ctx *MCPContext, req 
mcp.JSONRPCRequest, responseFormat transport.ResponseFormat) 
filter.FilterStatus {
+       // For notifications (no response needed), send 202 Accepted immediately
+       if responseFormat == transport.ResponseFormatAccepted {
+               ctx.SendLocalReply(http.StatusAccepted, nil)
+               return filter.Stop
+       }
+
+       // For tool calls, we need to forward to backend, so continue with 
existing logic
+       // but store the response format for use in Encode stage
+       return f.handleToolCall(ctx, req)
+}
+
+// sendResponseWithFormat sends response in the negotiated format
+func (f *MCPServerFilter) sendResponseWithFormat(ctx *MCPContext, response 
any, format transport.ResponseFormat) filter.FilterStatus {
+       switch format {
+       case transport.ResponseFormatJSON:
+               return f.sendJSONResponse(ctx, response)
+       case transport.ResponseFormatSSE:
+               return f.sendSSEResponse(ctx, response)
+       case transport.ResponseFormatAccepted:
+               ctx.SendLocalReply(http.StatusAccepted, nil)
+               return filter.Stop
+       default:
+               return f.sendJSONResponse(ctx, response)
+       }
+}
+
+// sendSSEResponse sends response via SSE stream
+func (f *MCPServerFilter) sendSSEResponse(ctx *MCPContext, response any) 
filter.FilterStatus {
+       sessionID := ctx.SessionID()
+       if sessionID == "" {
+               // No session, fall back to JSON
+               return f.sendJSONResponse(ctx, response)
+       }
+
+       session, exists := f.sessionManager.Session(sessionID)
+       if !exists {
+               // Session not found, fall back to JSON
+               logger.Warnf("[dubbo-go-pixiu] mcp server session not found: 
%s, falling back to JSON", sessionID)
+               return f.sendJSONResponse(ctx, response)
+       }
+
+       // Send via SSE
+       if err := f.sseHandler.SendSSEMessage(session, response); err != nil {
+               logger.Errorf("[dubbo-go-pixiu] mcp server failed to send SSE 
message: %v", err)
+               // Fall back to JSON
+               return f.sendJSONResponse(ctx, response)
+       }
+
+       // SSE message sent successfully, return 202 Accepted per MCP spec
+       logger.Debugf("[dubbo-go-pixiu] mcp server sent response via SSE for 
session: %s", sessionID)
+       ctx.SendLocalReply(http.StatusAccepted, nil)
+       return filter.Stop
+}
+
+// sessionExists checks if a session exists
+func (f *MCPServerFilter) sessionExists(sessionID string) bool {
+       _, exists := f.sessionManager.Session(sessionID)
+       return exists
+}
+
+// sendBadRequest sends a 400 Bad Request response
+func (f *MCPServerFilter) sendBadRequest(ctx *MCPContext, message string) 
filter.FilterStatus {
+       logger.Warnf("[dubbo-go-pixiu] mcp server bad request: %s", message)
+       ctx.SendLocalReply(http.StatusBadRequest, []byte(message))
+       return filter.Stop
+}
+
+// sendMethodNotAllowed sends a 405 Method Not Allowed response
+func (f *MCPServerFilter) sendMethodNotAllowed(ctx *MCPContext) 
filter.FilterStatus {
+       logger.Warnf("[dubbo-go-pixiu] mcp server method not allowed: %s", 
ctx.Request.Method)
+       ctx.Writer.Header().Set("Allow", "GET, POST")
+       ctx.SendLocalReply(http.StatusMethodNotAllowed, []byte("Method Not 
Allowed"))
+       return filter.Stop
+}
+
+// sendNotAcceptable sends a 406 Not Acceptable response
+func (f *MCPServerFilter) sendNotAcceptable(ctx *MCPContext, message string) 
filter.FilterStatus {
+       logger.Warnf("[dubbo-go-pixiu] mcp server not acceptable: %s", message)
+       ctx.SendLocalReply(http.StatusNotAcceptable, []byte(message))
+       return filter.Stop
+}
+
+// sendInternalError sends a 500 Internal Server Error response
+func (f *MCPServerFilter) sendInternalError(ctx *MCPContext, message string) 
filter.FilterStatus {
+       logger.Errorf("[dubbo-go-pixiu] mcp server internal error: %s", message)
+       ctx.SendLocalReply(http.StatusInternalServerError, []byte("Internal 
Server Error"))
+       return filter.Stop
+}
+
+// maintainSSEPipe maintains the SSE pipe connection with keepalive
+func (f *MCPServerFilter) maintainSSEPipe(ctx *MCPContext, session 
*transport.MCPSession) {
+       // Ensure cleanup on exit
+       defer func() {
+               if session.PipeWriter != nil {
+                       session.PipeWriter.Close()
+               }
+               f.sessionManager.RemoveSession(session.ID)
+               logger.Debugf("[dubbo-go-pixiu] mcp server SSE pipe maintenance 
ended for session: %s", session.ID)
+       }()
+
+       ticker := time.NewTicker(transport.KeepaliveInterval)
+       defer ticker.Stop()
+
+       for {
+               select {
+               case <-ticker.C:
+                       // Send keepalive comment (ignored by SSE clients)
+                       keepalive := 
f.sseHandler.FormatSSEKeepalive(time.Now().Unix())
+                       if _, err := 
session.PipeWriter.Write([]byte(keepalive)); err != nil {
+                               logger.Warnf("[dubbo-go-pixiu] mcp server 
keepalive write failed for session %s: %v", session.ID, err)
+                               return
+                       }
+                       session.LastActivity = time.Now()
+                       logger.Debugf("[dubbo-go-pixiu] mcp server sent 
keepalive for session: %s", session.ID)
+
+               case <-session.Done:
+                       // Server-initiated close
+                       logger.Infof("[dubbo-go-pixiu] mcp server closing SSE 
stream (server initiated) for session: %s", session.ID)
+                       return
+
+               case <-ctx.Ctx.Done():
+                       // Client disconnected
+                       logger.Infof("[dubbo-go-pixiu] mcp server closing SSE 
stream (client disconnected) for session: %s", session.ID)
+                       return
+               }
+       }
+}
+
+// SendServerNotification sends a server notification to the client via SSE 
stream
+func (f *MCPServerFilter) SendServerNotification(sessionID string, method 
string, params map[string]any) error {
+       session, exists := f.sessionManager.Session(sessionID)
+       if !exists {
+               return fmt.Errorf("session not found: %s", sessionID)
+       }
+
+       if session.PipeWriter == nil {
+               return fmt.Errorf("SSE pipe not established for session: %s", 
sessionID)
+       }
+
+       // Use ResponseBuilder to create notification
+       notification := f.responseBuilder.ServerNotification(method, params)
+
+       // Send via SSE
+       if err := f.sendMessageToSSEPipe(session, notification); err != nil {
+               return err
+       }
+
+       logger.Debugf("[dubbo-go-pixiu] mcp server sent notification to session 
%s: %s", sessionID, method)
+       return nil
+}
+
+// SendServerRequest sends a server request to the client via SSE stream
+func (f *MCPServerFilter) SendServerRequest(sessionID string, id any, method 
string, params any) error {
+       session, exists := f.sessionManager.Session(sessionID)
+       if !exists {
+               return fmt.Errorf("session not found: %s", sessionID)
+       }
+
+       if session.PipeWriter == nil {
+               return fmt.Errorf("SSE pipe not established for session: %s", 
sessionID)
+       }
+
+       // Use ResponseBuilder to create request
+       request := f.responseBuilder.ServerRequest(id, method, params)
+
+       // Send via SSE
+       if err := f.sendMessageToSSEPipe(session, request); err != nil {
+               return err
+       }
+
+       logger.Debugf("[dubbo-go-pixiu] mcp server sent request to session %s: 
%s (id: %v)", sessionID, method, id)
+       return nil
+}
+
+// sendMessageToSSEPipe sends a message to the SSE pipe
+func (f *MCPServerFilter) sendMessageToSSEPipe(session *transport.MCPSession, 
message any) error {
+       messageJSON, err := json.Marshal(message)
+       if err != nil {
+               return fmt.Errorf("failed to marshal message: %w", err)
+       }
+
+       // Use SSEHandler to format the message
+       sseData := f.sseHandler.FormatSSEMessage(string(messageJSON))
+
+       if _, err := session.PipeWriter.Write([]byte(sseData)); err != nil {
+               return fmt.Errorf("failed to write to SSE pipe: %w", err)
+       }
+
+       session.LastActivity = time.Now()
+       return nil
+}
diff --git a/pkg/filter/mcp/mcpserver/filter_sse_test.go 
b/pkg/filter/mcp/mcpserver/filter_sse_test.go
new file mode 100644
index 00000000..c42474ad
--- /dev/null
+++ b/pkg/filter/mcp/mcpserver/filter_sse_test.go
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mcpserver
+
+import (
+       "context"
+       "io"
+       "net/http"
+       "net/http/httptest"
+       "strings"
+       "testing"
+       "time"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
+       "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
+       contexthttp "github.com/apache/dubbo-go-pixiu/pkg/context/http"
+       "github.com/apache/dubbo-go-pixiu/pkg/filter/mcp/mcpserver/transport"
+       "github.com/apache/dubbo-go-pixiu/pkg/model"
+)
+
+func TestHandleGetRequest_SSEStream(t *testing.T) {
+       // Create filter
+       mcpFilter := createTestFilter(t)
+
+       // Create GET request
+       req := httptest.NewRequest(constant.Get, "/mcp", nil)
+       req.Header.Set(constant.HeaderKeyAccept, 
constant.HeaderValueTextEventStream)
+       req.Header.Set(constant.HeaderKeyMCPProtocolVersion, 
constant.MCPProtocolVersion20250618)
+
+       recorder := httptest.NewRecorder()
+       ctx := createTestContext(req, recorder)
+       mcpCtx := NewMCPContext(ctx)
+
+       // Parse headers
+       mcpCtx.ParseAndSetProtocolVersionHeader()
+       mcpCtx.ParseAndSetSessionHeader()
+       mcpCtx.ParseAndSetAcceptHeader()
+
+       // Execute handleGetRequest
+       status := mcpFilter.handleGetRequest(mcpCtx)
+
+       // Verify filter status
+       if status != filter.Stop {
+               t.Errorf("Expected filter.Stop, got %v", status)
+       }
+
+       // Verify SourceResp is set
+       if ctx.SourceResp == nil {
+               t.Fatal("SourceResp should be set")
+       }
+
+       httpResp, ok := ctx.SourceResp.(*http.Response)
+       if !ok {
+               t.Fatal("SourceResp should be *http.Response")
+       }
+
+       // Verify response headers
+       if httpResp.StatusCode != http.StatusOK {
+               t.Errorf("Expected status 200, got %d", httpResp.StatusCode)
+       }
+
+       contentType := httpResp.Header.Get(constant.HeaderKeyContextType)
+       if contentType != constant.HeaderValueTextEventStream {
+               t.Errorf("Expected Content-Type %s, got %s", 
constant.HeaderValueTextEventStream, contentType)
+       }
+
+       sessionID := httpResp.Header.Get(constant.HeaderKeyMCPSessionId)
+       if sessionID == "" {
+               t.Error("Mcp-Session-Id should be set")
+       }
+
+       // Verify session was created
+       session, exists := mcpFilter.sessionManager.Session(sessionID)
+       if !exists {
+               t.Error("Session should be created")
+       }
+       if session.PipeWriter == nil {
+               t.Error("Session PipeWriter should be set")
+       }
+
+       // Verify response body is pipe reader
+       if httpResp.Body == nil {
+               t.Error("Response body should be set")
+       }
+
+       // Cleanup
+       mcpFilter.sessionManager.Stop()
+}
+
+func TestHandleGetRequest_MissingAcceptHeader(t *testing.T) {
+       mcpFilter := createTestFilter(t)
+       defer mcpFilter.sessionManager.Stop()
+
+       // Create GET request without Accept header
+       req := httptest.NewRequest(constant.Get, "/mcp", nil)
+       recorder := httptest.NewRecorder()
+       ctx := createTestContext(req, recorder)
+       mcpCtx := NewMCPContext(ctx)
+
+       mcpCtx.ParseAndSetAcceptHeader()
+
+       status := mcpFilter.handleGetRequest(mcpCtx)
+
+       // Should return error
+       if status != filter.Stop {
+               t.Error("Expected filter.Stop for missing Accept header")
+       }
+
+       // Check for error response
+       if recorder.Code != http.StatusNotAcceptable {
+               t.Errorf("Expected status 406, got %d", recorder.Code)
+       }
+}
+
+func TestHandleGetRequest_ResumeExistingSession(t *testing.T) {
+       mcpFilter := createTestFilter(t)
+       defer mcpFilter.sessionManager.Stop()
+
+       // Create first session
+       session1, _ := mcpFilter.sessionManager.EnsureSession("")
+       sessionID := session1.ID
+
+       // Create GET request with existing session ID
+       req := httptest.NewRequest(constant.Get, "/mcp", nil)
+       req.Header.Set(constant.HeaderKeyAccept, 
constant.HeaderValueTextEventStream)
+       req.Header.Set(constant.HeaderKeyMCPSessionId, sessionID)
+
+       recorder := httptest.NewRecorder()
+       ctx := createTestContext(req, recorder)
+       mcpCtx := NewMCPContext(ctx)
+
+       mcpCtx.ParseAndSetSessionHeader()
+       mcpCtx.ParseAndSetAcceptHeader()
+
+       status := mcpFilter.handleGetRequest(mcpCtx)
+
+       if status != filter.Stop {
+               t.Errorf("Expected filter.Stop, got %v", status)
+       }
+
+       // Verify same session is reused
+       session2, exists := mcpFilter.sessionManager.Session(sessionID)
+       if !exists {
+               t.Error("Session should exist")
+       }
+       if session1.ID != session2.ID {
+               t.Error("Should reuse existing session")
+       }
+}
+
+func TestMaintainSSEPipe_Keepalive(t *testing.T) {
+       mcpFilter := createTestFilter(t)
+       defer mcpFilter.sessionManager.Stop()
+
+       // Create session with pipe
+       session, _ := mcpFilter.sessionManager.EnsureSession("")
+       pipeReader, pipeWriter := io.Pipe()
+       session.PipeWriter = pipeWriter
+
+       // Create context with timeout
+       ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+       defer cancel()
+
+       httpCtx := &contexthttp.HttpContext{
+               Ctx: ctx,
+       }
+       mcpCtx := NewMCPContext(httpCtx)
+
+       // Start maintainSSEPipe with very short interval for testing
+       // Note: This test uses production KeepaliveInterval (30s), so we won't 
actually receive keepalive in 2s
+       go mcpFilter.maintainSSEPipe(mcpCtx, session)
+
+       // Read from pipe in background
+       dataCh := make(chan string, 10)
+       go func() {
+               buf := make([]byte, 1024)
+               for {
+                       n, err := pipeReader.Read(buf)
+                       if err != nil {
+                               return
+                       }
+                       if n > 0 {
+                               dataCh <- string(buf[:n])
+                       }
+               }
+       }()
+
+       // Wait a bit to see if we receive anything (we shouldn't with 30s 
interval)
+       select {
+       case data := <-dataCh:
+               // If we somehow received data (unlikely with 30s interval), 
verify it's keepalive
+               if !strings.HasPrefix(data, ":") {
+                       t.Errorf("Expected keepalive comment, got: %s", data)
+               }
+       case <-time.After(500 * time.Millisecond):
+               // Expected: no data yet due to 30s interval
+       }
+
+       // Cancel context to stop maintenance
+       cancel()
+       time.Sleep(100 * time.Millisecond)
+
+       // Verify session was cleaned up
+       _, exists := mcpFilter.sessionManager.Session(session.ID)
+       if exists {
+               t.Error("Session should be removed after context cancellation")
+       }
+}
+
+func TestSendServerNotification(t *testing.T) {
+       mcpFilter := createTestFilter(t)
+       defer mcpFilter.sessionManager.Stop()
+
+       // Create session with pipe
+       session, _ := mcpFilter.sessionManager.EnsureSession("")
+       pipeReader, pipeWriter := io.Pipe()
+       session.PipeWriter = pipeWriter
+       sessionID := session.ID
+
+       // Send notification in goroutine
+       params := map[string]any{
+               "message": "test notification",
+               "value":   123,
+       }
+
+       errCh := make(chan error, 1)
+       go func() {
+               errCh <- mcpFilter.SendServerNotification(sessionID, 
"notifications/test", params)
+       }()
+
+       // Read from pipe
+       buf := make([]byte, 1024)
+       n, err := pipeReader.Read(buf)
+       if err != nil {
+               t.Fatalf("Failed to read from pipe: %v", err)
+       }
+
+       received := string(buf[:n])
+
+       // Verify SSE format
+       if !strings.HasPrefix(received, "data:") {
+               t.Error("Should start with 'data:'")
+       }
+       if !strings.HasSuffix(received, "\n\n") {
+               t.Error("Should end with \\n\\n")
+       }
+       if !strings.Contains(received, "notifications/test") {
+               t.Error("Should contain method name")
+       }
+       if !strings.Contains(received, "jsonrpc") {
+               t.Error("Should contain jsonrpc field")
+       }
+
+       // Verify no error
+       if err := <-errCh; err != nil {
+               t.Errorf("SendServerNotification failed: %v", err)
+       }
+
+       pipeWriter.Close()
+       pipeReader.Close()
+}
+
+func TestSendServerNotification_NoSession(t *testing.T) {
+       mcpFilter := createTestFilter(t)
+       defer mcpFilter.sessionManager.Stop()
+
+       err := mcpFilter.SendServerNotification("non-existent-id", "test", 
map[string]any{})
+       if err == nil {
+               t.Error("Expected error for non-existent session")
+       }
+       if !strings.Contains(err.Error(), "session not found") {
+               t.Errorf("Unexpected error message: %v", err)
+       }
+}
+
+func TestSendServerRequest(t *testing.T) {
+       mcpFilter := createTestFilter(t)
+       defer mcpFilter.sessionManager.Stop()
+
+       // Create session with pipe
+       session, _ := mcpFilter.sessionManager.EnsureSession("")
+       pipeReader, pipeWriter := io.Pipe()
+       session.PipeWriter = pipeWriter
+       sessionID := session.ID
+
+       // Send request in goroutine
+       params := map[string]any{"key": "value"}
+
+       errCh := make(chan error, 1)
+       go func() {
+               errCh <- mcpFilter.SendServerRequest(sessionID, 123, 
"prompts/get", params)
+       }()
+
+       // Read from pipe
+       buf := make([]byte, 1024)
+       n, err := pipeReader.Read(buf)
+       if err != nil {
+               t.Fatalf("Failed to read from pipe: %v", err)
+       }
+
+       received := string(buf[:n])
+
+       // Verify SSE format
+       if !strings.HasPrefix(received, "data:") {
+               t.Error("Should start with 'data:'")
+       }
+       if !strings.Contains(received, "prompts/get") {
+               t.Error("Should contain method name")
+       }
+       if !strings.Contains(received, `"id"`) {
+               t.Error("Should contain id field")
+       }
+
+       // Verify no error
+       if err := <-errCh; err != nil {
+               t.Errorf("SendServerRequest failed: %v", err)
+       }
+
+       pipeWriter.Close()
+       pipeReader.Close()
+}
+
+// Note: TestValidateMCPProtocolVersion removed - version negotiation now 
happens
+// during initialize request/response per MCP spec, not at HTTP header level.
+
+// Helper functions
+
+func createTestFilter(t *testing.T) *MCPServerFilter {
+       cfg := &model.McpServerConfig{
+               ServerInfo: model.ServerInfo{
+                       Name:    "Test Server",
+                       Version: "1.0.0",
+               },
+               Endpoint: "/mcp",
+               Tools:    []model.ToolConfig{},
+       }
+
+       factory := &FilterFactory{cfg: cfg}
+       if err := factory.Apply(); err != nil {
+               t.Fatalf("Failed to apply filter factory: %v", err)
+       }
+
+       sessionManager := transport.NewSessionManager()
+       sseHandler := transport.NewSSEHandler(sessionManager)
+       contentNegotiator := transport.NewContentNegotiator()
+
+       return &MCPServerFilter{
+               cfg:               cfg,
+               registry:          factory.registry,
+               errorHandler:      NewErrorHandler(),
+               responseBuilder:   NewResponseBuilder(),
+               sessionManager:    sessionManager,
+               sseHandler:        sseHandler,
+               contentNegotiator: contentNegotiator,
+       }
+}
+
+func createTestContext(req *http.Request, recorder *httptest.ResponseRecorder) 
*contexthttp.HttpContext {
+       return &contexthttp.HttpContext{
+               Request: req,
+               Writer:  recorder,
+               Ctx:     context.Background(),
+               Params:  make(map[string]any),
+       }
+}
diff --git a/pkg/filter/mcp/mcpserver/globals.go 
b/pkg/filter/mcp/mcpserver/globals.go
new file mode 100644
index 00000000..4657d375
--- /dev/null
+++ b/pkg/filter/mcp/mcpserver/globals.go
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mcpserver
+
+import (
+       "sync"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/filter/mcp/mcpserver/transport"
+       "github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+// Global singletons for MCP server components
+var (
+       globalRegistry       *ToolRegistry
+       globalDynamic        *DynamicConsumer
+       globalSessionManager *transport.SessionManager
+
+       // sync.Once variables for thread-safe singleton initialization
+       registryOnce       sync.Once
+       dynamicOnce        sync.Once
+       sessionManagerOnce sync.Once
+)
+
+// GetOrInitRegistry returns the global tool registry singleton
+func GetOrInitRegistry() *ToolRegistry {
+       registryOnce.Do(func() {
+               globalRegistry = NewToolRegistry()
+               logger.Infof("[dubbo-go-pixiu] mcp server initialized global 
tool registry")
+       })
+       return globalRegistry
+}
+
+// GetOrInitDynamicConsumer returns the global dynamic consumer singleton
+func GetOrInitDynamicConsumer() *DynamicConsumer {
+       dynamicOnce.Do(func() {
+               reg := GetOrInitRegistry()
+               sm := GetOrInitSessionManager()
+               sseHandler := transport.NewSSEHandler(sm)
+               globalDynamic = NewDynamicConsumer(reg, sm, sseHandler)
+               logger.Infof("[dubbo-go-pixiu] mcp server initialized global 
dynamic consumer")
+       })
+       return globalDynamic
+}
+
+// GetOrInitSessionManager returns the global session manager singleton
+func GetOrInitSessionManager() *transport.SessionManager {
+       sessionManagerOnce.Do(func() {
+               globalSessionManager = transport.NewSessionManager()
+               logger.Infof("[dubbo-go-pixiu] mcp server initialized global 
session manager")
+       })
+       return globalSessionManager
+}
+
+// ResetGlobalState resets all global singletons (for testing)
+func ResetGlobalState() {
+       globalRegistry = nil
+       globalDynamic = nil
+       if globalSessionManager != nil {
+               globalSessionManager.Stop()
+       }
+       globalSessionManager = nil
+
+       registryOnce = sync.Once{}
+       dynamicOnce = sync.Once{}
+       sessionManagerOnce = sync.Once{}
+
+       logger.Debugf("[dubbo-go-pixiu] mcp server global state reset")
+}
diff --git a/pkg/filter/mcp/mcpserver/handlers.go 
b/pkg/filter/mcp/mcpserver/handlers.go
index 21f44e52..ae0a1f4c 100644
--- a/pkg/filter/mcp/mcpserver/handlers.go
+++ b/pkg/filter/mcp/mcpserver/handlers.go
@@ -33,6 +33,7 @@ import (
        "github.com/apache/dubbo-go-pixiu/pkg/client"
        "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
        "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
+       "github.com/apache/dubbo-go-pixiu/pkg/filter/mcp/mcpserver/transport"
        "github.com/apache/dubbo-go-pixiu/pkg/logger"
        "github.com/apache/dubbo-go-pixiu/pkg/model"
 )
@@ -50,13 +51,34 @@ const (
 
 // handleInitialize handles the initialize method
 func (f *MCPServerFilter) handleInitialize(ctx *MCPContext, req 
mcp.JSONRPCRequest) filter.FilterStatus {
-       // Build server capabilities using mcp-go structures
+       // Parse client's protocol version from request params
+       var initParams struct {
+               ProtocolVersion string `json:"protocolVersion"`
+               ClientInfo      struct {
+                       Name    string `json:"name"`
+                       Version string `json:"version"`
+               } `json:"clientInfo"`
+       }
+
+       if req.Params != nil {
+               if paramsBytes, err := json.Marshal(req.Params); err == nil {
+                       json.Unmarshal(paramsBytes, &initParams)
+               }
+       }
+
+       clientVersion := initParams.ProtocolVersion
+       if clientVersion == "" {
+               clientVersion = ctx.ProtocolVersion()
+       }
+
+       logger.Infof("[dubbo-go-pixiu] mcp server initialize: client=%s 
version=%s, server will respond with=%s",
+               initParams.ClientInfo.Name, clientVersion, 
constant.MCPProtocolVersion20250618)
+
        capabilities := mcp.ServerCapabilities{
                Tools: &struct {
                        ListChanged bool `json:"listChanged,omitempty"`
                }{
-                       // disable listChanged notifications (not implemented)
-                       ListChanged: false,
+                       ListChanged: true,
                },
                Resources: &struct {
                        Subscribe   bool `json:"subscribe,omitempty"`
@@ -72,26 +94,40 @@ func (f *MCPServerFilter) handleInitialize(ctx *MCPContext, 
req mcp.JSONRPCReque
                },
        }
 
-       // Build server info using mcp-go structures
        serverInfo := mcp.Implementation{
                Name:    f.cfg.ServerInfo.Name,
                Version: f.cfg.ServerInfo.Version,
        }
 
-       // Create initialization result using mcp-go API
        instructions := f.cfg.ServerInfo.Instructions
        if instructions == "" {
                instructions = "This MCP server provides API access through 
tools, documentation through resources, and AI assistance through prompts."
        }
-       result := mcp.NewInitializeResult(mcp.LATEST_PROTOCOL_VERSION, 
capabilities, serverInfo, instructions)
+       result := mcp.NewInitializeResult(constant.MCPProtocolVersion20250618, 
capabilities, serverInfo, instructions)
 
-       // Create JSON-RPC response
        response := f.responseBuilder.Success(req.ID, result)
+
+       // Per MCP spec: assign a session ID at initialization time for 
Streamable HTTP transport
+       // This enables clients to use the session for SSE-based responses
+       sessionIDHeader := ctx.SessionID()
+       session, _ := f.sessionManager.EnsureSession(sessionIDHeader)
+
+       // Add Mcp-Session-Id header to the response
+       ctx.AddHeader(constant.HeaderKeyMCPSessionId, session.ID)
+
+       logger.Infof("[dubbo-go-pixiu] mcp server created session for client: 
%s", session.ID)
+
        return f.sendJSONResponse(ctx, response)
 }
 
 // handleToolsList handles the tools/list method using mcp-go APIs
-func (f *MCPServerFilter) handleToolsList(ctx *MCPContext, req 
mcp.JSONRPCRequest) filter.FilterStatus {
+func (f *MCPServerFilter) handleToolsList(ctx *MCPContext, req 
mcp.JSONRPCRequest, responseFormat transport.ResponseFormat) 
filter.FilterStatus {
+       response := f.buildToolsListResponseObject(req)
+       return f.sendResponseWithFormat(ctx, response, responseFormat)
+}
+
+// buildToolsListResponseObject builds the tools/list response object (for SSE)
+func (f *MCPServerFilter) buildToolsListResponseObject(req mcp.JSONRPCRequest) 
mcp.JSONRPCResponse {
        // Read tools from registry to reflect dynamic updates
        toolCfgs := f.registry.ListTools()
        tools := make([]mcp.Tool, 0, len(toolCfgs))
@@ -122,10 +158,9 @@ func (f *MCPServerFilter) handleToolsList(ctx *MCPContext, 
req mcp.JSONRPCReques
        }
 
        // Build standard MCP tools list response using mcp-go structures
-       result := mcp.NewListToolsResult(tools, "") // empty cursor for no 
pagination
+       result := mcp.NewListToolsResult(tools, "")
 
-       response := f.responseBuilder.Success(req.ID, result)
-       return f.sendJSONResponse(ctx, response)
+       return f.responseBuilder.Success(req.ID, result)
 }
 
 // buildToolParameterOptions builds the mcp.PropertyOption slice for a given 
tool argument
@@ -166,46 +201,47 @@ func (f *MCPServerFilter) buildToolParameterOptions(arg 
*model.ArgConfig) []mcp.
 }
 
 // handleResourcesList handles the resources/list method
-func (f *MCPServerFilter) handleResourcesList(ctx *MCPContext, req 
mcp.JSONRPCRequest) filter.FilterStatus {
-       // Get all resources
+func (f *MCPServerFilter) handleResourcesList(ctx *MCPContext, req 
mcp.JSONRPCRequest, responseFormat transport.ResponseFormat) 
filter.FilterStatus {
        mcpResources, err := f.registry.ToMCPResources()
        if err != nil {
                logger.Errorf("[dubbo-go-pixiu] mcp server failed to get MCP 
resources: %v", err)
                return f.errorHandler.SendInternalError(ctx, req.ID, "failed to 
get resources")
        }
 
-       // Build resources list response using mcp-go structures
-       result := mcp.NewListResourcesResult(mcpResources, "") // empty cursor 
for no pagination
+       response := f.buildResourcesListResponseObject(req, mcpResources)
+       return f.sendResponseWithFormat(ctx, response, responseFormat)
+}
 
-       response := f.responseBuilder.Success(req.ID, result)
-       return f.sendJSONResponse(ctx, response)
+// buildResourcesListResponseObject builds the resources/list response object 
(for SSE)
+func (f *MCPServerFilter) buildResourcesListResponseObject(req 
mcp.JSONRPCRequest, mcpResources []mcp.Resource) mcp.JSONRPCResponse {
+       // Build resources list response using mcp-go structures
+       result := mcp.NewListResourcesResult(mcpResources, "")
+       return f.responseBuilder.Success(req.ID, result)
 }
 
 // handlePing handles the ping method
-func (f *MCPServerFilter) handlePing(ctx *MCPContext, req mcp.JSONRPCRequest) 
filter.FilterStatus {
-       logger.Debugf("[dubbo-go-pixiu] mcp server handling ping request")
-
-       // Simple ping response
-       result := map[string]any{}
+func (f *MCPServerFilter) handlePing(ctx *MCPContext, req mcp.JSONRPCRequest, 
responseFormat transport.ResponseFormat) filter.FilterStatus {
+       response := f.buildPingResponseObject(req)
+       return f.sendResponseWithFormat(ctx, response, responseFormat)
+}
 
-       response := f.responseBuilder.Success(req.ID, result)
-       return f.sendJSONResponse(ctx, response)
+// buildPingResponseObject builds the ping response object (for SSE)
+func (f *MCPServerFilter) buildPingResponseObject(req mcp.JSONRPCRequest) 
mcp.JSONRPCResponse {
+       logger.Debugf("[dubbo-go-pixiu] mcp server handling ping request")
+       return f.responseBuilder.Success(req.ID, map[string]any{})
 }
 
 // handleNotificationsInitialized handles notifications/initialized 
notification
-func (f *MCPServerFilter) handleNotificationsInitialized(_ *MCPContext, _ 
mcp.JSONRPCRequest) filter.FilterStatus {
+func (f *MCPServerFilter) handleNotificationsInitialized(ctx *MCPContext, _ 
mcp.JSONRPCRequest) filter.FilterStatus {
        logger.Debugf("[dubbo-go-pixiu] mcp server received initialized 
notification from client")
 
-       // Store client initialization state
-       // This notification indicates that the client has completed 
initialization
-       // and is ready to receive requests
-
-       // For notifications, we don't send a response, just return Stop
+       // Per MCP spec, notifications MUST return 202 Accepted with no body
+       ctx.SendLocalReply(http.StatusAccepted, nil)
        return filter.Stop
 }
 
 // handleResourceRead handles the resources/read method
-func (f *MCPServerFilter) handleResourceRead(ctx *MCPContext, req 
mcp.JSONRPCRequest) filter.FilterStatus {
+func (f *MCPServerFilter) handleResourceRead(ctx *MCPContext, req 
mcp.JSONRPCRequest, responseFormat transport.ResponseFormat) 
filter.FilterStatus {
        logger.Debugf("[dubbo-go-pixiu] mcp server handling resources/read")
 
        // Parse request parameters
@@ -245,11 +281,11 @@ func (f *MCPServerFilter) handleResourceRead(ctx 
*MCPContext, req mcp.JSONRPCReq
        }
 
        response := f.responseBuilder.Success(req.ID, result)
-       return f.sendJSONResponse(ctx, response)
+       return f.sendResponseWithFormat(ctx, response, responseFormat)
 }
 
 // handleResourceTemplatesList handles the resources/templates/list method
-func (f *MCPServerFilter) handleResourceTemplatesList(ctx *MCPContext, req 
mcp.JSONRPCRequest) filter.FilterStatus {
+func (f *MCPServerFilter) handleResourceTemplatesList(ctx *MCPContext, req 
mcp.JSONRPCRequest, responseFormat transport.ResponseFormat) 
filter.FilterStatus {
        // Get all resource templates (parameterized resource patterns)
        mcpResourceTemplates, err := f.registry.ToMCPResourceTemplates()
        if err != nil {
@@ -263,11 +299,11 @@ func (f *MCPServerFilter) handleResourceTemplatesList(ctx 
*MCPContext, req mcp.J
        }
 
        response := f.responseBuilder.Success(req.ID, result)
-       return f.sendJSONResponse(ctx, response)
+       return f.sendResponseWithFormat(ctx, response, responseFormat)
 }
 
 // handlePromptsList handles the prompts/list method
-func (f *MCPServerFilter) handlePromptsList(ctx *MCPContext, req 
mcp.JSONRPCRequest) filter.FilterStatus {
+func (f *MCPServerFilter) handlePromptsList(ctx *MCPContext, req 
mcp.JSONRPCRequest, responseFormat transport.ResponseFormat) 
filter.FilterStatus {
        logger.Debugf("[dubbo-go-pixiu] mcp server handling prompts/list 
request")
 
        // Get all prompts
@@ -283,11 +319,11 @@ func (f *MCPServerFilter) handlePromptsList(ctx 
*MCPContext, req mcp.JSONRPCRequ
        }
 
        response := f.responseBuilder.Success(req.ID, result)
-       return f.sendJSONResponse(ctx, response)
+       return f.sendResponseWithFormat(ctx, response, responseFormat)
 }
 
 // handlePromptsGet handles the prompts/get method
-func (f *MCPServerFilter) handlePromptsGet(ctx *MCPContext, req 
mcp.JSONRPCRequest) filter.FilterStatus {
+func (f *MCPServerFilter) handlePromptsGet(ctx *MCPContext, req 
mcp.JSONRPCRequest, responseFormat transport.ResponseFormat) 
filter.FilterStatus {
        logger.Debugf("[dubbo-go-pixiu] mcp server handling prompts/get 
request")
 
        // Parse request parameters
@@ -327,7 +363,7 @@ func (f *MCPServerFilter) handlePromptsGet(ctx *MCPContext, 
req mcp.JSONRPCReque
        }
 
        response := f.responseBuilder.Success(req.ID, result)
-       return f.sendJSONResponse(ctx, response)
+       return f.sendResponseWithFormat(ctx, response, responseFormat)
 }
 
 // buildPromptMessages builds prompt messages with parameter replacement 
support
@@ -548,6 +584,25 @@ func (f *MCPServerFilter) processToolCallResponse(ctx 
*MCPContext, requestID any
 
 // sendMCPResponse sends an MCP response and updates the target response
 func (f *MCPServerFilter) sendMCPResponse(ctx *MCPContext, response 
mcp.JSONRPCResponse) filter.FilterStatus {
+       // Check if we should send via SSE (when session exists with active SSE 
stream)
+       sessionID := ctx.SessionID()
+       if sessionID != "" {
+               session, exists := f.sessionManager.Session(sessionID)
+               if exists && session.PipeWriter != nil {
+                       // Send via SSE stream
+                       if sseErr := f.sseHandler.SendSSEMessage(session, 
response); sseErr == nil {
+                               logger.Debugf("[dubbo-go-pixiu] mcp server sent 
tool call response via SSE for session: %s", sessionID)
+                               // Return 202 Accepted without body per MCP spec
+                               ctx.SendLocalReply(http.StatusAccepted, nil)
+                               return filter.Stop
+                       } else {
+                               // If SSE send failed, fall through to JSON 
response
+                               logger.Warnf("[dubbo-go-pixiu] mcp server SSE 
send failed, falling back to JSON: %v", sseErr)
+                       }
+               }
+       }
+
+       // Default: send as JSON response (no SSE stream available)
        mcpResponseBody, err := json.Marshal(response)
        if err != nil {
                logger.Errorf("[dubbo-go-pixiu] mcp server failed to marshal 
MCP response: %v", err)
@@ -559,9 +614,6 @@ func (f *MCPServerFilter) sendMCPResponse(ctx *MCPContext, 
response mcp.JSONRPCR
        ctx.StatusCode(http.StatusOK)
        ctx.AddHeader(constant.HeaderKeyContextType, 
constant.HeaderValueApplicationJson)
 
-       // Critical: Clear Content-Length header to prevent mismatch errors
-       ctx.ClearContentLengthHeader()
-
        logger.Debugf("[dubbo-go-pixiu] mcp server successfully wrapped backend 
response in MCP format")
        return filter.Continue
 }
diff --git a/pkg/filter/mcp/mcpserver/registry.go 
b/pkg/filter/mcp/mcpserver/registry.go
index 7c4f8937..dae4120c 100644
--- a/pkg/filter/mcp/mcpserver/registry.go
+++ b/pkg/filter/mcp/mcpserver/registry.go
@@ -38,11 +38,6 @@ type ToolRegistry struct {
        resources         map[string]model.ResourceConfig         // indexed by 
URI
        resourceTemplates map[string]model.ResourceTemplateConfig // indexed by 
name
        prompts           map[string]model.PromptConfig
-
-       // TODO: Dynamic update support - add when integrating with Nacos
-       // changeListeners   []ChangeListener              // change listeners
-       // nacosClient      nacos.ConfigClient            // Nacos config client
-       // serviceDiscovery nacos.NamingClient            // Nacos service 
discovery client
 }
 
 // NewToolRegistry creates a new tool registry
@@ -364,5 +359,3 @@ func (r *ToolRegistry) ToMCPPrompts() ([]map[string]any, 
error) {
 
        return mcpPrompts, nil
 }
-
-// TODO: Dynamic update functionality - implement when integrating with Nacos
diff --git a/pkg/filter/mcp/mcpserver/response.go 
b/pkg/filter/mcp/mcpserver/response.go
index 7612bb2b..8e34543d 100644
--- a/pkg/filter/mcp/mcpserver/response.go
+++ b/pkg/filter/mcp/mcpserver/response.go
@@ -151,3 +151,30 @@ func (eh *ErrorHandler) sendResponse(ctx *MCPContext, 
response any) filter.Filte
        ctx.SendLocalReply(http.StatusOK, responseBody)
        return filter.Stop
 }
+
+// ServerNotification creates a JSON-RPC notification from server
+func (rb *ResponseBuilder) ServerNotification(method string, params 
map[string]any) mcp.JSONRPCNotification {
+       notificationParams := mcp.NotificationParams{
+               AdditionalFields: params,
+       }
+
+       return mcp.JSONRPCNotification{
+               JSONRPC: mcp.JSONRPC_VERSION,
+               Notification: mcp.Notification{
+                       Method: method,
+                       Params: notificationParams,
+               },
+       }
+}
+
+// ServerRequest creates a JSON-RPC request from server
+func (rb *ResponseBuilder) ServerRequest(id any, method string, params any) 
mcp.JSONRPCRequest {
+       return mcp.JSONRPCRequest{
+               JSONRPC: mcp.JSONRPC_VERSION,
+               ID:      mcp.NewRequestId(id),
+               Params:  params,
+               Request: mcp.Request{
+                       Method: method,
+               },
+       }
+}
diff --git a/pkg/filter/mcp/mcpserver/transport/content_negotiator.go 
b/pkg/filter/mcp/mcpserver/transport/content_negotiator.go
new file mode 100644
index 00000000..07d7f060
--- /dev/null
+++ b/pkg/filter/mcp/mcpserver/transport/content_negotiator.go
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package transport
+
+import (
+       "strings"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
+)
+
+// ResponseFormat represents the response format type
+type ResponseFormat int
+
+const (
+       ResponseFormatJSON     ResponseFormat = iota // Traditional JSON 
response
+       ResponseFormatSSE                            // Server-Sent Events 
response
+       ResponseFormatAccepted                       // 202 Accepted (no 
immediate response)
+)
+
+// ContentNegotiator handles HTTP content negotiation for MCP responses
+type ContentNegotiator struct{}
+
+// NewContentNegotiator creates a new content negotiator
+func NewContentNegotiator() *ContentNegotiator {
+       return &ContentNegotiator{}
+}
+
+// NegotiateResponse determines the appropriate response format based on 
Accept header and session state
+func (cn *ContentNegotiator) NegotiateResponse(acceptHeader string, hasSession 
bool) ResponseFormat {
+       _, supportsSSE := cn.parseAcceptHeader(acceptHeader)
+
+       // Per MCP spec: when a session exists and client accepts SSE, use SSE 
for consistent streaming
+       // Otherwise, default to JSON (for backward compatibility and when no 
session available)
+       if supportsSSE && hasSession {
+               return ResponseFormatSSE
+       }
+
+       return ResponseFormatJSON
+}
+
+// SupportsSSE checks if the Accept header includes text/event-stream
+func (cn *ContentNegotiator) SupportsSSE(acceptHeader string) bool {
+       _, supportsSSE := cn.parseAcceptHeader(acceptHeader)
+       return supportsSSE
+}
+
+// SupportsJSON checks if the Accept header includes application/json
+func (cn *ContentNegotiator) SupportsJSON(acceptHeader string) bool {
+       supportsJSON, _ := cn.parseAcceptHeader(acceptHeader)
+       return supportsJSON
+}
+
+// parseAcceptHeader parses the Accept header and returns support for JSON and 
SSE
+func (cn *ContentNegotiator) parseAcceptHeader(acceptHeader string) 
(supportsJSON, supportsSSE bool) {
+       if acceptHeader == "" {
+               // Default to JSON support for backward compatibility
+               return true, false
+       }
+
+       acceptHeader = strings.ToLower(acceptHeader)
+
+       // Check for JSON support
+       if strings.Contains(acceptHeader, constant.HeaderValueApplicationJson) 
||
+               strings.Contains(acceptHeader, 
constant.MediaTypeApplicationWild) ||
+               strings.Contains(acceptHeader, constant.MediaTypeWildcard) {
+               supportsJSON = true
+       }
+
+       // Check for SSE support
+       if strings.Contains(acceptHeader, constant.HeaderValueTextEventStream) 
||
+               strings.Contains(acceptHeader, constant.MediaTypeTextWild) ||
+               strings.Contains(acceptHeader, constant.MediaTypeWildcard) {
+               supportsSSE = true
+       }
+
+       return supportsJSON, supportsSSE
+}
+
+// GetPreferredContentType returns the Content-Type header value for the given 
format
+func (cn *ContentNegotiator) GetPreferredContentType(format ResponseFormat) 
string {
+       switch format {
+       case ResponseFormatSSE:
+               return constant.HeaderValueTextEventStream
+       case ResponseFormatJSON, ResponseFormatAccepted:
+               return constant.HeaderValueApplicationJson
+       default:
+               return constant.HeaderValueApplicationJson
+       }
+}
+
+// ValidateAcceptHeaderForMethod validates that the Accept header is 
appropriate for the HTTP method
+func (cn *ContentNegotiator) ValidateAcceptHeaderForMethod(method, 
acceptHeader string) error {
+       switch strings.ToUpper(method) {
+       case constant.Get:
+               // GET requests for SSE must accept text/event-stream
+               if !cn.SupportsSSE(acceptHeader) {
+                       return &AcceptHeaderError{
+                               Method:       method,
+                               AcceptHeader: acceptHeader,
+                               Required:     
constant.HeaderValueTextEventStream,
+                       }
+               }
+       case constant.Post:
+               // POST requests should accept both application/json and 
text/event-stream
+               supportsJSON, supportsSSE := cn.parseAcceptHeader(acceptHeader)
+               if !supportsJSON && !supportsSSE {
+                       return &AcceptHeaderError{
+                               Method:       method,
+                               AcceptHeader: acceptHeader,
+                               Required:     
constant.HeaderValueApplicationJson + " or " + 
constant.HeaderValueTextEventStream,
+                       }
+               }
+       }
+       return nil
+}
+
+// AcceptHeaderError represents an Accept header validation error
+type AcceptHeaderError struct {
+       Method       string
+       AcceptHeader string
+       Required     string
+}
+
+func (e *AcceptHeaderError) Error() string {
+       if e.AcceptHeader == "" {
+               return "missing Accept header for " + e.Method + " request, 
required: " + e.Required
+       }
+       return "invalid Accept header '" + e.AcceptHeader + "' for " + e.Method 
+ " request, required: " + e.Required
+}
diff --git a/pkg/filter/mcp/mcpserver/transport/content_negotiator_test.go 
b/pkg/filter/mcp/mcpserver/transport/content_negotiator_test.go
new file mode 100644
index 00000000..43d66014
--- /dev/null
+++ b/pkg/filter/mcp/mcpserver/transport/content_negotiator_test.go
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package transport
+
+import (
+       "testing"
+)
+
+import (
+       "github.com/mark3labs/mcp-go/mcp"
+)
+
+func TestNewContentNegotiator(t *testing.T) {
+       cn := NewContentNegotiator()
+       if cn == nil {
+               t.Fatal("NewContentNegotiator returned nil")
+       }
+}
+
+func TestParseAcceptHeader(t *testing.T) {
+       cn := NewContentNegotiator()
+
+       tests := []struct {
+               name         string
+               acceptHeader string
+               wantJSON     bool
+               wantSSE      bool
+       }{
+               {
+                       name:         "empty header defaults to JSON",
+                       acceptHeader: "",
+                       wantJSON:     true,
+                       wantSSE:      false,
+               },
+               {
+                       name:         "application/json only",
+                       acceptHeader: "application/json",
+                       wantJSON:     true,
+                       wantSSE:      false,
+               },
+               {
+                       name:         "text/event-stream only",
+                       acceptHeader: "text/event-stream",
+                       wantJSON:     false,
+                       wantSSE:      true,
+               },
+               {
+                       name:         "both formats",
+                       acceptHeader: "application/json, text/event-stream",
+                       wantJSON:     true,
+                       wantSSE:      true,
+               },
+               {
+                       name:         "wildcard accepts all",
+                       acceptHeader: "*/*",
+                       wantJSON:     true,
+                       wantSSE:      true,
+               },
+               {
+                       name:         "application wildcard",
+                       acceptHeader: "application/*",
+                       wantJSON:     true,
+                       wantSSE:      false,
+               },
+               {
+                       name:         "text wildcard",
+                       acceptHeader: "text/*",
+                       wantJSON:     false,
+                       wantSSE:      true,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       gotJSON, gotSSE := cn.parseAcceptHeader(tt.acceptHeader)
+                       if gotJSON != tt.wantJSON {
+                               t.Errorf("parseAcceptHeader() JSON = %v, want 
%v", gotJSON, tt.wantJSON)
+                       }
+                       if gotSSE != tt.wantSSE {
+                               t.Errorf("parseAcceptHeader() SSE = %v, want 
%v", gotSSE, tt.wantSSE)
+                       }
+               })
+       }
+}
+
+func TestSupportsSSE(t *testing.T) {
+       cn := NewContentNegotiator()
+
+       tests := []struct {
+               name         string
+               acceptHeader string
+               want         bool
+       }{
+               {"text/event-stream", "text/event-stream", true},
+               {"with charset", "text/event-stream;charset=utf-8", true},
+               {"wildcard", "*/*", true},
+               {"text wildcard", "text/*", true},
+               {"JSON only", "application/json", false},
+               {"empty", "", false},
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       if got := cn.SupportsSSE(tt.acceptHeader); got != 
tt.want {
+                               t.Errorf("SupportsSSE() = %v, want %v", got, 
tt.want)
+                       }
+               })
+       }
+}
+
+func TestSupportsJSON(t *testing.T) {
+       cn := NewContentNegotiator()
+
+       tests := []struct {
+               name         string
+               acceptHeader string
+               want         bool
+       }{
+               {"application/json", "application/json", true},
+               {"with charset", "application/json;charset=utf-8", true},
+               {"wildcard", "*/*", true},
+               {"application wildcard", "application/*", true},
+               {"SSE only", "text/event-stream", false},
+               {"empty defaults to JSON", "", true},
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       if got := cn.SupportsJSON(tt.acceptHeader); got != 
tt.want {
+                               t.Errorf("SupportsJSON() = %v, want %v", got, 
tt.want)
+                       }
+               })
+       }
+}
+
+func TestNegotiateResponse(t *testing.T) {
+       cn := NewContentNegotiator()
+
+       tests := []struct {
+               name         string
+               acceptHeader string
+               method       string
+               hasSession   bool
+               want         ResponseFormat
+       }{
+               {
+                       name:         "no accept header defaults to JSON",
+                       acceptHeader: "",
+                       method:       "initialize",
+                       hasSession:   false,
+                       want:         ResponseFormatJSON,
+               },
+               {
+                       name:         "JSON only returns JSON",
+                       acceptHeader: "application/json",
+                       method:       "initialize",
+                       hasSession:   true,
+                       want:         ResponseFormatJSON,
+               },
+               {
+                       name:         "SSE only with session",
+                       acceptHeader: "text/event-stream",
+                       method:       "initialize",
+                       hasSession:   true,
+                       want:         ResponseFormatSSE, // Returns SSE if 
session exists and only SSE accepted
+               },
+               {
+                       name:         "tool call with both and session prefers 
SSE",
+                       acceptHeader: "application/json, text/event-stream",
+                       method:       string(mcp.MethodToolsCall),
+                       hasSession:   true,
+                       want:         ResponseFormatSSE,
+               },
+               {
+                       name:         "tool call without session uses JSON",
+                       acceptHeader: "application/json, text/event-stream",
+                       method:       string(mcp.MethodToolsCall),
+                       hasSession:   false,
+                       want:         ResponseFormatJSON,
+               },
+               {
+                       name:         "any method with both and session prefers 
SSE",
+                       acceptHeader: "application/json, text/event-stream",
+                       method:       string(mcp.MethodToolsList),
+                       hasSession:   true,
+                       want:         ResponseFormatSSE,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       got := cn.NegotiateResponse(tt.acceptHeader, 
tt.hasSession)
+                       if got != tt.want {
+                               t.Errorf("NegotiateResponse() = %v, want %v", 
got, tt.want)
+                       }
+               })
+       }
+}
+
+func TestGetPreferredContentType(t *testing.T) {
+       cn := NewContentNegotiator()
+
+       tests := []struct {
+               name   string
+               format ResponseFormat
+               want   string
+       }{
+               {
+                       name:   "JSON format",
+                       format: ResponseFormatJSON,
+                       want:   "application/json",
+               },
+               {
+                       name:   "SSE format",
+                       format: ResponseFormatSSE,
+                       want:   "text/event-stream",
+               },
+               {
+                       name:   "Accepted format",
+                       format: ResponseFormatAccepted,
+                       want:   "application/json",
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       if got := cn.GetPreferredContentType(tt.format); got != 
tt.want {
+                               t.Errorf("GetPreferredContentType() = %v, want 
%v", got, tt.want)
+                       }
+               })
+       }
+}
diff --git a/pkg/filter/mcp/mcpserver/transport/session_manager.go 
b/pkg/filter/mcp/mcpserver/transport/session_manager.go
new file mode 100644
index 00000000..1eee222e
--- /dev/null
+++ b/pkg/filter/mcp/mcpserver/transport/session_manager.go
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package transport
+
+import (
+       "crypto/rand"
+       "encoding/hex"
+       "io"
+       "sync"
+       "time"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+const (
+       SessionTimeout    = 30 * time.Minute
+       CleanupInterval   = 5 * time.Minute
+       KeepaliveInterval = 30 * time.Second
+)
+
+// MCPSession represents an active MCP session
+type MCPSession struct {
+       ID           string
+       CreatedAt    time.Time
+       LastActivity time.Time
+       PipeWriter   *io.PipeWriter // Pipe writer for sending SSE messages
+       Done         chan struct{}
+}
+
+// SessionManager manages MCP sessions for SSE connections
+type SessionManager struct {
+       sessions map[string]*MCPSession
+       mu       sync.RWMutex
+       stopCh   chan struct{}
+       once     sync.Once
+}
+
+// NewSessionManager creates a new session manager
+func NewSessionManager() *SessionManager {
+       sm := &SessionManager{
+               sessions: make(map[string]*MCPSession),
+               stopCh:   make(chan struct{}),
+       }
+       go sm.startCleanupRoutine()
+       return sm
+}
+
+// EnsureSession gets existing session or creates new one
+func (sm *SessionManager) EnsureSession(sessionIDHeader string) (*MCPSession, 
bool) {
+       sm.mu.Lock()
+       defer sm.mu.Unlock()
+
+       // Try to get existing session
+       if sessionIDHeader != "" {
+               if session, exists := sm.sessions[sessionIDHeader]; exists {
+                       session.LastActivity = time.Now()
+                       return session, false // existing session
+               }
+       }
+
+       // Create new session
+       sessionID := sm.generateSessionID()
+       session := &MCPSession{
+               ID:           sessionID,
+               CreatedAt:    time.Now(),
+               LastActivity: time.Now(),
+               Done:         make(chan struct{}),
+       }
+
+       sm.sessions[sessionID] = session
+       logger.Infof("[dubbo-go-pixiu] mcp server created new session: %s", 
sessionID)
+       return session, true // new session
+}
+
+// Session retrieves a session by ID
+func (sm *SessionManager) Session(sessionID string) (*MCPSession, bool) {
+       sm.mu.RLock()
+       defer sm.mu.RUnlock()
+
+       session, exists := sm.sessions[sessionID]
+       if exists {
+               session.LastActivity = time.Now()
+       }
+       return session, exists
+}
+
+// RemoveSession removes a session and cleans up resources
+func (sm *SessionManager) RemoveSession(sessionID string) {
+       sm.mu.Lock()
+       defer sm.mu.Unlock()
+
+       if session, exists := sm.sessions[sessionID]; exists {
+               // Close Done channel to signal goroutines
+               close(session.Done)
+
+               // Close PipeWriter to end the SSE stream
+               if session.PipeWriter != nil {
+                       session.PipeWriter.Close()
+               }
+
+               delete(sm.sessions, sessionID)
+               logger.Infof("[dubbo-go-pixiu] mcp server removed session: %s", 
sessionID)
+       }
+}
+
+// Stop stops the session manager
+func (sm *SessionManager) Stop() {
+       sm.once.Do(func() {
+               close(sm.stopCh)
+
+               sm.mu.Lock()
+               for sessionID, session := range sm.sessions {
+                       close(session.Done)
+                       delete(sm.sessions, sessionID)
+               }
+               sm.mu.Unlock()
+       })
+}
+
+// generateSessionID generates a unique session ID
+func (sm *SessionManager) generateSessionID() string {
+       bytes := make([]byte, 16)
+       if _, err := rand.Read(bytes); err != nil {
+               // Fallback to timestamp-based ID
+               return hex.EncodeToString([]byte(time.Now().String()))
+       }
+       return hex.EncodeToString(bytes)
+}
+
+// startCleanupRoutine starts the session cleanup routine
+func (sm *SessionManager) startCleanupRoutine() {
+       ticker := time.NewTicker(CleanupInterval)
+       defer ticker.Stop()
+
+       for {
+               select {
+               case <-ticker.C:
+                       sm.cleanupExpiredSessions()
+               case <-sm.stopCh:
+                       return
+               }
+       }
+}
+
+// cleanupExpiredSessions removes expired sessions
+func (sm *SessionManager) cleanupExpiredSessions() {
+       sm.mu.Lock()
+       defer sm.mu.Unlock()
+
+       now := time.Now()
+       var toRemove []string
+
+       for sessionID, session := range sm.sessions {
+               if now.Sub(session.LastActivity) > SessionTimeout {
+                       toRemove = append(toRemove, sessionID)
+               }
+       }
+
+       for _, sessionID := range toRemove {
+               if session, exists := sm.sessions[sessionID]; exists {
+                       close(session.Done)
+                       delete(sm.sessions, sessionID)
+                       logger.Infof("[dubbo-go-pixiu] mcp server cleaned up 
expired session: %s", sessionID)
+               }
+       }
+
+       if len(toRemove) > 0 {
+               logger.Debugf("[dubbo-go-pixiu] mcp server cleaned up %d 
expired sessions", len(toRemove))
+       }
+}
+
+// AllSessionIDs returns all active session IDs
+func (sm *SessionManager) AllSessionIDs() []string {
+       sm.mu.RLock()
+       defer sm.mu.RUnlock()
+
+       ids := make([]string, 0, len(sm.sessions))
+       for id := range sm.sessions {
+               ids = append(ids, id)
+       }
+       return ids
+}
+
+// ActiveSessionCount returns the number of active sessions
+func (sm *SessionManager) ActiveSessionCount() int {
+       sm.mu.RLock()
+       defer sm.mu.RUnlock()
+       return len(sm.sessions)
+}
diff --git a/pkg/filter/mcp/mcpserver/transport/session_manager_test.go 
b/pkg/filter/mcp/mcpserver/transport/session_manager_test.go
new file mode 100644
index 00000000..93acc5c4
--- /dev/null
+++ b/pkg/filter/mcp/mcpserver/transport/session_manager_test.go
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package transport
+
+import (
+       "testing"
+       "time"
+)
+
+func TestNewSessionManager(t *testing.T) {
+       sm := NewSessionManager()
+       if sm == nil {
+               t.Fatal("NewSessionManager returned nil")
+       }
+       if sm.sessions == nil {
+               t.Error("sessions map not initialized")
+       }
+       if sm.stopCh == nil {
+               t.Error("stopCh not initialized")
+       }
+       sm.Stop()
+}
+
+func TestEnsureSession_CreateNew(t *testing.T) {
+       sm := NewSessionManager()
+       defer sm.Stop()
+
+       // Test creating new session with empty header
+       session, isNew := sm.EnsureSession("")
+       if !isNew {
+               t.Error("Expected new session to be created")
+       }
+       if session == nil {
+               t.Fatal("Session should not be nil")
+       }
+       if session.ID == "" {
+               t.Error("Session ID should not be empty")
+       }
+       if session.Done == nil {
+               t.Error("Session Done channel should be initialized")
+       }
+       if len(session.ID) != 32 {
+               t.Errorf("Expected session ID length 32, got %d", 
len(session.ID))
+       }
+}
+
+func TestEnsureSession_ReuseExisting(t *testing.T) {
+       sm := NewSessionManager()
+       defer sm.Stop()
+
+       // Create first session
+       session1, isNew1 := sm.EnsureSession("")
+       if !isNew1 {
+               t.Error("Expected new session")
+       }
+
+       // Try to get same session by ID
+       session2, isNew2 := sm.EnsureSession(session1.ID)
+       if isNew2 {
+               t.Error("Expected existing session, not new")
+       }
+       if session1.ID != session2.ID {
+               t.Error("Should return same session")
+       }
+
+       // Verify last activity was updated
+       if session2.LastActivity.Before(session1.LastActivity) {
+               t.Error("LastActivity should be updated")
+       }
+}
+
+func TestSession(t *testing.T) {
+       sm := NewSessionManager()
+       defer sm.Stop()
+
+       // Create session
+       session1, _ := sm.EnsureSession("")
+       sessionID := session1.ID
+
+       // Retrieve session
+       session2, exists := sm.Session(sessionID)
+       if !exists {
+               t.Error("Session should exist")
+       }
+       if session2.ID != sessionID {
+               t.Error("Retrieved wrong session")
+       }
+
+       // Try non-existent session
+       _, exists = sm.Session("non-existent-id")
+       if exists {
+               t.Error("Non-existent session should not be found")
+       }
+}
+
+func TestRemoveSession(t *testing.T) {
+       sm := NewSessionManager()
+       defer sm.Stop()
+
+       // Create session
+       session, _ := sm.EnsureSession("")
+       sessionID := session.ID
+
+       // Verify session exists
+       _, exists := sm.Session(sessionID)
+       if !exists {
+               t.Fatal("Session should exist before removal")
+       }
+
+       // Remove session
+       sm.RemoveSession(sessionID)
+
+       // Verify session removed
+       _, exists = sm.Session(sessionID)
+       if exists {
+               t.Error("Session should be removed")
+       }
+
+       // Verify Done channel closed
+       select {
+       case <-session.Done:
+               // Expected: channel should be closed
+       default:
+               t.Error("Done channel should be closed")
+       }
+
+       // Test removing non-existent session (should not panic)
+       sm.RemoveSession("non-existent-id")
+}
+
+func TestSessionCleanup(t *testing.T) {
+       sm := NewSessionManager()
+       defer sm.Stop()
+
+       // Create session
+       session, _ := sm.EnsureSession("")
+       sessionID := session.ID
+
+       // Manually set old LastActivity to simulate timeout
+       sm.mu.Lock()
+       session.LastActivity = time.Now().Add(-SessionTimeout - 1*time.Minute)
+       sm.mu.Unlock()
+
+       // Trigger cleanup
+       sm.cleanupExpiredSessions()
+
+       // Verify session was cleaned up
+       _, exists := sm.Session(sessionID)
+       if exists {
+               t.Error("Expired session should be cleaned up")
+       }
+}
+
+func TestSessionManager_Stop(t *testing.T) {
+       sm := NewSessionManager()
+
+       // Create multiple sessions
+       session1, _ := sm.EnsureSession("")
+       session2, _ := sm.EnsureSession("")
+
+       // Stop manager
+       sm.Stop()
+
+       // Verify all sessions removed
+       _, exists1 := sm.Session(session1.ID)
+       _, exists2 := sm.Session(session2.ID)
+       if exists1 || exists2 {
+               t.Error("All sessions should be removed on Stop")
+       }
+
+       // Verify Done channels closed
+       select {
+       case <-session1.Done:
+               // Expected
+       default:
+               t.Error("Session1 Done should be closed")
+       }
+
+       select {
+       case <-session2.Done:
+               // Expected
+       default:
+               t.Error("Session2 Done should be closed")
+       }
+
+       // Verify stop channel closed
+       select {
+       case <-sm.stopCh:
+               // Expected
+       default:
+               t.Error("stopCh should be closed")
+       }
+}
+
+func TestGenerateSessionID(t *testing.T) {
+       sm := NewSessionManager()
+       defer sm.Stop()
+
+       // Generate multiple session IDs
+       ids := make(map[string]bool)
+       for i := 0; i < 100; i++ {
+               id := sm.generateSessionID()
+               if id == "" {
+                       t.Error("Generated session ID should not be empty")
+               }
+               if len(id) != 32 {
+                       t.Errorf("Expected session ID length 32, got %d", 
len(id))
+               }
+               if ids[id] {
+                       t.Errorf("Duplicate session ID generated: %s", id)
+               }
+               ids[id] = true
+       }
+}
+
+func TestConcurrentSessionAccess(t *testing.T) {
+       sm := NewSessionManager()
+       defer sm.Stop()
+
+       // Create initial session
+       session, _ := sm.EnsureSession("")
+       sessionID := session.ID
+
+       done := make(chan bool, 3)
+
+       // Concurrent reads
+       go func() {
+               for i := 0; i < 100; i++ {
+                       sm.Session(sessionID)
+                       time.Sleep(time.Millisecond)
+               }
+               done <- true
+       }()
+
+       // Concurrent writes (update LastActivity)
+       go func() {
+               for i := 0; i < 100; i++ {
+                       sm.Session(sessionID)
+                       time.Sleep(time.Millisecond)
+               }
+               done <- true
+       }()
+
+       // Concurrent session creation
+       go func() {
+               for i := 0; i < 10; i++ {
+                       sm.EnsureSession("")
+                       time.Sleep(10 * time.Millisecond)
+               }
+               done <- true
+       }()
+
+       // Wait for all goroutines
+       for i := 0; i < 3; i++ {
+               <-done
+       }
+}
diff --git a/pkg/filter/mcp/mcpserver/transport/sse_handler.go 
b/pkg/filter/mcp/mcpserver/transport/sse_handler.go
new file mode 100644
index 00000000..166a9f95
--- /dev/null
+++ b/pkg/filter/mcp/mcpserver/transport/sse_handler.go
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package transport
+
+import (
+       "encoding/json"
+       "fmt"
+       "time"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+// SSEHandler provides utility functions for SSE message formatting
+type SSEHandler struct {
+       sessionManager *SessionManager
+}
+
+// NewSSEHandler creates a new SSE handler
+func NewSSEHandler(sessionManager *SessionManager) *SSEHandler {
+       return &SSEHandler{
+               sessionManager: sessionManager,
+       }
+}
+
+// SendSSEMessage sends a message through the SSE pipe
+func (h *SSEHandler) SendSSEMessage(session *MCPSession, message any) error {
+       if session.PipeWriter == nil {
+               return fmt.Errorf("SSE pipe not established")
+       }
+
+       // Marshal message to JSON
+       messageJSON, err := json.Marshal(message)
+       if err != nil {
+               return fmt.Errorf("failed to marshal SSE message: %w", err)
+       }
+
+       // Format as SSE event
+       sseData := h.FormatSSEMessage(string(messageJSON))
+
+       // Write to pipe
+       if _, err := session.PipeWriter.Write([]byte(sseData)); err != nil {
+               logger.Errorf("[dubbo-go-pixiu] mcp server failed to send SSE 
message: %v", err)
+               return fmt.Errorf("failed to write to SSE pipe: %w", err)
+       }
+
+       session.LastActivity = time.Now()
+       logger.Debugf("[dubbo-go-pixiu] mcp server sent SSE message to session: 
%s", session.ID)
+       return nil
+}
+
+// FormatSSEMessage formats a message as SSE data
+func (h *SSEHandler) FormatSSEMessage(messageJSON string) string {
+       return fmt.Sprintf("data: %s\n\n", messageJSON)
+}
+
+// FormatSSEEvent formats a custom SSE event with event type
+func (h *SSEHandler) FormatSSEEvent(eventType, data string) string {
+       if eventType != "" {
+               return fmt.Sprintf("event: %s\ndata: %s\n\n", eventType, data)
+       }
+       return fmt.Sprintf("data: %s\n\n", data)
+}
+
+// FormatSSEEventWithID formats a SSE event with ID for resumability
+func (h *SSEHandler) FormatSSEEventWithID(eventID int64, data string) string {
+       return fmt.Sprintf("id: %d\ndata: %s\n\n", eventID, data)
+}
+
+// FormatSSEKeepalive formats a SSE keepalive comment
+func (h *SSEHandler) FormatSSEKeepalive(timestamp int64) string {
+       return fmt.Sprintf(": keepalive %d\n\n", timestamp)
+}
diff --git a/pkg/filter/mcp/mcpserver/transport/sse_handler_test.go 
b/pkg/filter/mcp/mcpserver/transport/sse_handler_test.go
new file mode 100644
index 00000000..b1246d8f
--- /dev/null
+++ b/pkg/filter/mcp/mcpserver/transport/sse_handler_test.go
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package transport
+
+import (
+       "io"
+       "testing"
+       "time"
+)
+
+func TestNewSSEHandler(t *testing.T) {
+       sm := NewSessionManager()
+       defer sm.Stop()
+
+       handler := NewSSEHandler(sm)
+       if handler == nil {
+               t.Fatal("NewSSEHandler returned nil")
+       }
+       if handler.sessionManager == nil {
+               t.Error("sessionManager not initialized")
+       }
+}
+
+func TestFormatSSEMessage(t *testing.T) {
+       sm := NewSessionManager()
+       defer sm.Stop()
+       handler := NewSSEHandler(sm)
+
+       tests := []struct {
+               name     string
+               input    string
+               expected string
+       }{
+               {
+                       name:     "simple message",
+                       input:    `{"jsonrpc":"2.0","method":"test"}`,
+                       expected: "data: 
{\"jsonrpc\":\"2.0\",\"method\":\"test\"}\n\n",
+               },
+               {
+                       name:     "empty message",
+                       input:    "",
+                       expected: "data: \n\n",
+               },
+               {
+                       name:     "message with newlines",
+                       input:    "line1\nline2",
+                       expected: "data: line1\nline2\n\n",
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       result := handler.FormatSSEMessage(tt.input)
+                       if result != tt.expected {
+                               t.Errorf("FormatSSEMessage() = %q, want %q", 
result, tt.expected)
+                       }
+               })
+       }
+}
+
+func TestFormatSSEEvent(t *testing.T) {
+       sm := NewSessionManager()
+       defer sm.Stop()
+       handler := NewSSEHandler(sm)
+
+       tests := []struct {
+               name      string
+               eventType string
+               data      string
+               expected  string
+       }{
+               {
+                       name:      "with event type",
+                       eventType: "message",
+                       data:      "test data",
+                       expected:  "event: message\ndata: test data\n\n",
+               },
+               {
+                       name:      "without event type",
+                       eventType: "",
+                       data:      "test data",
+                       expected:  "data: test data\n\n",
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       result := handler.FormatSSEEvent(tt.eventType, tt.data)
+                       if result != tt.expected {
+                               t.Errorf("FormatSSEEvent() = %q, want %q", 
result, tt.expected)
+                       }
+               })
+       }
+}
+
+func TestFormatSSEEventWithID(t *testing.T) {
+       sm := NewSessionManager()
+       defer sm.Stop()
+       handler := NewSSEHandler(sm)
+
+       result := handler.FormatSSEEventWithID(123, "test data")
+       expected := "id: 123\ndata: test data\n\n"
+       if result != expected {
+               t.Errorf("FormatSSEEventWithID() = %q, want %q", result, 
expected)
+       }
+}
+
+func TestFormatSSEKeepalive(t *testing.T) {
+       sm := NewSessionManager()
+       defer sm.Stop()
+       handler := NewSSEHandler(sm)
+
+       timestamp := time.Now().Unix()
+       result := handler.FormatSSEKeepalive(timestamp)
+
+       // Just check format is correct (contains keepalive and timestamp)
+       if len(result) < 10 {
+               t.Error("Keepalive format too short")
+       }
+       if result[0] != ':' {
+               t.Error("Keepalive should start with ':'")
+       }
+       if result[len(result)-2:] != "\n\n" {
+               t.Error("Keepalive should end with \\n\\n")
+       }
+}
+
+func TestSendSSEMessage(t *testing.T) {
+       sm := NewSessionManager()
+       defer sm.Stop()
+       handler := NewSSEHandler(sm)
+
+       // Create session with pipe
+       session, _ := sm.EnsureSession("")
+       pipeReader, pipeWriter := io.Pipe()
+       session.PipeWriter = pipeWriter
+
+       // Send message in goroutine
+       message := map[string]any{
+               "jsonrpc": "2.0",
+               "method":  "test",
+       }
+
+       errCh := make(chan error, 1)
+       go func() {
+               errCh <- handler.SendSSEMessage(session, message)
+       }()
+
+       // Read from pipe
+       buf := make([]byte, 1024)
+       n, err := pipeReader.Read(buf)
+       if err != nil {
+               t.Fatalf("Failed to read from pipe: %v", err)
+       }
+
+       received := string(buf[:n])
+       if len(received) == 0 {
+               t.Error("Should receive data")
+       }
+       if received[:5] != "data:" {
+               t.Error("Should start with 'data:'")
+       }
+       if received[len(received)-2:] != "\n\n" {
+               t.Error("Should end with \\n\\n")
+       }
+
+       // Check send error
+       if err := <-errCh; err != nil {
+               t.Errorf("SendSSEMessage failed: %v", err)
+       }
+
+       pipeWriter.Close()
+       pipeReader.Close()
+}
+
+func TestSendSSEMessage_NoPipe(t *testing.T) {
+       sm := NewSessionManager()
+       defer sm.Stop()
+       handler := NewSSEHandler(sm)
+
+       // Create session without pipe
+       session, _ := sm.EnsureSession("")
+
+       message := map[string]any{"test": "data"}
+       err := handler.SendSSEMessage(session, message)
+       if err == nil {
+               t.Error("Expected error when PipeWriter is nil")
+       }
+       if err.Error() != "SSE pipe not established" {
+               t.Errorf("Unexpected error message: %v", err)
+       }
+}
+
+func TestSendSSEMessage_InvalidJSON(t *testing.T) {
+       sm := NewSessionManager()
+       defer sm.Stop()
+       handler := NewSSEHandler(sm)
+
+       session, _ := sm.EnsureSession("")
+       pipeReader, pipeWriter := io.Pipe()
+       session.PipeWriter = pipeWriter
+       defer pipeReader.Close()
+       defer pipeWriter.Close()
+
+       // Try to send invalid message (channel cannot be marshaled)
+       invalidMessage := make(chan int)
+       err := handler.SendSSEMessage(session, invalidMessage)
+       if err == nil {
+               t.Error("Expected error when marshaling invalid message")
+       }
+}


Reply via email to