This is an automated email from the ASF dual-hosted git repository.
wuxinfan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git
The following commit(s) were added to refs/heads/develop by this push:
new d3da9280 feat(MCP): enhance streamable http capability (#769)
d3da9280 is described below
commit d3da92806f7c6efe05d64452d53334df9b537da5
Author: Zerui Yang <[email protected]>
AuthorDate: Thu Oct 30 15:02:29 2025 +0800
feat(MCP): enhance streamable http capability (#769)
* feat(mcp): implement Nacos client and MCP server configuration management
* feat(nacos): add MCP controller and provider registration for Nacos
integration
* feat(mcp): enhance Nacos integration with improved tool registry and
configuration synchronization
* fix: rollback
* fix: update dependencies for Nacos integration and improve compatibility
* chore: add Apache License header to multiple files
* fix: update dependencies for improved compatibility and performance
* fix: refactor URL parsing to use constants and add test
* fix: implement thread-safe singleton initialization for ToolRegistry and
DynamicConsumer
* fix: update Credentials type to use 'any' for improved flexibility
* fix: add unit tests for dynamic consumer and tool registry with
concurrency support
* fix: update resetNacosTemplateConfigs to clarify its role in resetting
credential configurations
* fix: update types to use 'any' for improved flexibility in Nacos
configuration
* fix: enhance DynamicConsumer with debounce functionality and fingerprint
calculation
* fix: update GetDebounceInfo to use 'any' for improved type flexibility
* fix: reorganize imports in dynamic.go for better structure
* fix: refactor URL parsing logic and enhance error handling in adapter and
registry components
* test: add unit tests for URL parsing and path extraction functions
* fix: standardize root path return value in ExtractPathFromURL and
ReplaceGoTemplateArgsInPath functions
* fix: update test case to use example.com for host:port format
* fix: update MCP controller and dynamic consumer to handle server ID in
configuration updates
* fix: update MCP controller and dynamic consumer to handle server ID in
configuration updates
* fix: update dependencies in go.mod and go.sum files
* fix: add validation for Nacos addresses and improve logging in MCP adapter
* fix: update go.mod and go.sum to include new Alibaba Cloud SDK
dependencies
* feat: add documentation to implement streamable HTTP enhancements
* feat: implement content negotiation and session management for MCP SSE
support
* fix: refactor MCP server components for improved session management and
SSE handling
* fix: simplify content negotiation for SSE responses in MCP server
* fix: remove doc
* refactor: organize imports and ensure newline at end of files
* fix: update dependencies in go.mod and go.sum
* fix: remove unused global variables and singleton functions from
dynamic.go
* Revert "fix: correct Pull bot user identification and enhance PR body
formatting"
This reverts commit 40646cbf
* fix: correct Pull bot user identification in sync-to-upstream.yml
* fix: update NewDynamicConsumer to include SSEHandler and refactor SSE
message formatting
* fix: reorganize import statements in dynamic.go, dynamic_test.go, and
globals.go
* fix: replace strings.Replace with strings.ReplaceAll for improved
readability in client_test.go
* fix: reorganize import statements in client_test.go for improved clarity
* rollback: back sync file
---
.github/workflows/sync-to-upstream.yml | 41 +-
go.mod | 26 +-
go.sum | 56 +--
.../mcpserver/registry/nacos/client_test.go | 117 ++----
pkg/adapter/mcpserver/registrycenter.go | 2 +-
pkg/common/constant/http.go | 18 +
pkg/filter/mcp/mcpserver/context.go | 106 ++++++
pkg/filter/mcp/mcpserver/dynamic.go | 107 ++++--
.../mcp/mcpserver/dynamic_notification_test.go | 181 +++++++++
pkg/filter/mcp/mcpserver/dynamic_test.go | 65 ++--
pkg/filter/mcp/mcpserver/filter.go | 424 ++++++++++++++++++---
pkg/filter/mcp/mcpserver/filter_sse_test.go | 382 +++++++++++++++++++
pkg/filter/mcp/mcpserver/globals.go | 85 +++++
pkg/filter/mcp/mcpserver/handlers.go | 134 +++++--
pkg/filter/mcp/mcpserver/registry.go | 7 -
pkg/filter/mcp/mcpserver/response.go | 27 ++
.../mcp/mcpserver/transport/content_negotiator.go | 146 +++++++
.../mcpserver/transport/content_negotiator_test.go | 247 ++++++++++++
.../mcp/mcpserver/transport/session_manager.go | 206 ++++++++++
.../mcpserver/transport/session_manager_test.go | 272 +++++++++++++
pkg/filter/mcp/mcpserver/transport/sse_handler.go | 89 +++++
.../mcp/mcpserver/transport/sse_handler_test.go | 226 +++++++++++
22 files changed, 2663 insertions(+), 301 deletions(-)
diff --git a/.github/workflows/sync-to-upstream.yml
b/.github/workflows/sync-to-upstream.yml
index 3b1fe8d1..aaad4276 100644
--- a/.github/workflows/sync-to-upstream.yml
+++ b/.github/workflows/sync-to-upstream.yml
@@ -21,11 +21,11 @@ name: Sync to Upstream
# Automatically creates a PR to upstream when a PR is merged to fork.
# Only executes in fork repositories (checked by github.repository).
-#
+#
# Prerequisites:
# - UPSTREAM_GITHUB_TOKEN secret with 'public_repo' permission
# - Bot account with Write access to fork repository
-#
+#
# Configuration: Modify the env section below for your project
# ============================================================================
@@ -35,14 +35,14 @@ env:
# Upstream repository
UPSTREAM_ORG: apache
UPSTREAM_REPO: dubbo-go-pixiu
-
+
# Fork repository
FORK_ORG: dubbo-go-pixiu
FORK_REPO: dubbo-go-pixiu
-
+
# Branch name
BASE_BRANCH: develop
-
+
# Git bot info
BOT_NAME: "Pixiu Bot"
BOT_EMAIL: "[email protected]"
@@ -66,10 +66,10 @@ jobs:
sync-to-upstream:
name: Sync to Upstream Repository
runs-on: ubuntu-latest
-
+
# Only run when PR is merged; repository check happens inside steps where
env is available
if: github.event.pull_request.merged == true
-
+
steps:
- name: Check repository and target branch
id: check_branch
@@ -101,21 +101,21 @@ jobs:
fetch-depth: 0
ref: ${{ env.BASE_BRANCH }}
token: ${{ secrets.UPSTREAM_GITHUB_TOKEN }}
-
+
# Step 2: Configure Git user
- name: Configure Git user
if: steps.check_branch.outputs.skip != 'true'
run: |
git config user.name "${{ env.BOT_NAME }}"
git config user.email "${{ env.BOT_EMAIL }}"
-
+
# Step 3: Add upstream remote and fetch
- name: Add upstream remote
if: steps.check_branch.outputs.skip != 'true'
run: |
git remote add upstream https://github.com/${{ env.UPSTREAM_ORG
}}/${{ env.UPSTREAM_REPO }}.git
git fetch upstream ${{ env.BASE_BRANCH }}
-
+
# Step 4: Create sync branch with timestamp
- name: Create sync branch
if: steps.check_branch.outputs.skip != 'true'
@@ -125,20 +125,20 @@ jobs:
echo "SYNC_BRANCH=${SYNC_BRANCH}" >> $GITHUB_ENV
git checkout -b ${SYNC_BRANCH}
echo "branch=${SYNC_BRANCH}" >> $GITHUB_OUTPUT
-
+
# Step 5: Rebase onto upstream base branch
- name: Rebase onto upstream
if: steps.check_branch.outputs.skip != 'true'
id: rebase
run: |
git rebase upstream/${{ env.BASE_BRANCH }}
-
+
# Step 6: Push sync branch to origin
- name: Push sync branch
if: steps.check_branch.outputs.skip != 'true'
run: |
git push origin ${SYNC_BRANCH} --force-with-lease
-
+
# Step 7: Generate PR body with attribution
- name: Generate PR description
if: steps.check_branch.outputs.skip != 'true'
@@ -176,7 +176,7 @@ jobs:
echo ""
echo "cc @${ORIGINAL_AUTHOR}"
} > pr_body.md
-
+
- name: Create PR to ${{ env.UPSTREAM_ORG }}/${{ env.UPSTREAM_REPO }}
if: steps.check_branch.outputs.skip != 'true'
id: create_pr
@@ -193,7 +193,7 @@ jobs:
echo "pr_url=${PR_URL}" >> $GITHUB_OUTPUT
echo "✅ Successfully created PR: ${PR_URL}"
-
+
- name: Notify original PR
if: success() && steps.check_branch.outputs.skip != 'true'
env:
@@ -214,7 +214,7 @@ jobs:
echo "⚠️ Comment failed but sync succeeded: $UPSTREAM_PR_URL"
exit 0
}
-
+
- name: Handle rebase failure
if: failure() && steps.rebase.outcome == 'failure'
env:
@@ -228,13 +228,13 @@ jobs:
--repo ${{ env.FORK_ORG }}/${{ env.FORK_REPO }} \
--title "⚠️ Failed to auto-sync PR #${PR_NUMBER} to upstream" \
--body "## Sync Failure Report
-
+
**Original PR**: #${PR_NUMBER}
**Author**: @${PR_AUTHOR}
**Error**: Rebase conflicts detected
-
+
### Manual Resolution Required
-
+
\`\`\`bash
git checkout ${{ env.BASE_BRANCH }}
git checkout -b manual-sync-${PR_NUMBER}
@@ -245,9 +245,8 @@ jobs:
git push origin manual-sync-${PR_NUMBER}
# Create PR to ${{ env.UPSTREAM_ORG }}/${{ env.UPSTREAM_REPO }}
\`\`\`
-
+
cc @${PR_AUTHOR}" \
--label "sync-failure,needs-attention"
echo "❌ Rebase failed. Issue created for manual resolution."
-
diff --git a/go.mod b/go.mod
index 0c1803a7..9304f06e 100644
--- a/go.mod
+++ b/go.mod
@@ -30,8 +30,8 @@ require (
github.com/golang/protobuf v1.5.4
github.com/jhump/protoreflect v1.17.0
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
- github.com/lestrrat-go/httprc/v3 v3.0.0-beta1
- github.com/lestrrat-go/jwx/v3 v3.0.0
+ github.com/lestrrat-go/httprc/v3 v3.0.1
+ github.com/lestrrat-go/jwx/v3 v3.0.12
github.com/mark3labs/mcp-go v0.32.0
github.com/mitchellh/mapstructure v1.5.0
github.com/nacos-group/nacos-sdk-go v1.1.3
@@ -44,7 +44,7 @@ require (
github.com/spf13/cast v1.7.1
github.com/spf13/cobra v1.5.0
github.com/spf13/viper v1.8.1
- github.com/stretchr/testify v1.10.0
+ github.com/stretchr/testify v1.11.1
github.com/swaggo/files v1.0.1
github.com/swaggo/gin-swagger v1.6.0
github.com/swaggo/swag v1.8.12
@@ -60,8 +60,8 @@ require (
go.opentelemetry.io/otel/sdk/metric v0.32.1
go.opentelemetry.io/otel/trace v1.10.0
go.uber.org/zap v1.21.0
- golang.org/x/crypto v0.41.0
- golang.org/x/net v0.43.0
+ golang.org/x/crypto v0.43.0
+ golang.org/x/net v0.45.0
google.golang.org/grpc v1.66.2
google.golang.org/protobuf v1.36.6
gopkg.in/yaml.v3 v3.0.1
@@ -178,9 +178,12 @@ require (
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/knadh/koanf v1.5.0 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
- github.com/lestrrat-go/blackmagic v1.0.2 // indirect
+ github.com/lestrrat-go/blackmagic v1.0.4 // indirect
+ github.com/lestrrat-go/dsig v1.0.0 // indirect
+ github.com/lestrrat-go/dsig-secp256k1 v1.0.0 // indirect
github.com/lestrrat-go/httpcc v1.0.1 // indirect
github.com/lestrrat-go/option v1.0.1 // indirect
+ github.com/lestrrat-go/option/v2 v2.0.0 // indirect
github.com/lestrrat-go/strftime v1.1.1 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 //
indirect
github.com/magiconair/properties v1.8.5 // indirect
@@ -211,7 +214,7 @@ require (
github.com/prometheus/procfs v0.16.1 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 //
indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
- github.com/segmentio/asm v1.2.0 // indirect
+ github.com/segmentio/asm v1.2.1 // indirect
github.com/shirou/gopsutil/v3 v3.22.2 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/smartystreets/assertions v1.2.0 // indirect
@@ -228,6 +231,7 @@ require (
github.com/uber/jaeger-client-go v2.29.1+incompatible // indirect
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
+ github.com/valyala/fastjson v1.6.4 // indirect
github.com/wasmerio/wasmer-go v1.0.3 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb //
indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415
// indirect
@@ -244,11 +248,11 @@ require (
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/arch v0.20.0 // indirect
golang.org/x/oauth2 v0.30.0 // indirect
- golang.org/x/sync v0.16.0 // indirect
- golang.org/x/sys v0.35.0 // indirect
- golang.org/x/text v0.28.0 // indirect
+ golang.org/x/sync v0.17.0 // indirect
+ golang.org/x/sys v0.37.0 // indirect
+ golang.org/x/text v0.30.0 // indirect
golang.org/x/time v0.12.0 // indirect
- golang.org/x/tools v0.36.0 // indirect
+ golang.org/x/tools v0.37.0 // indirect
google.golang.org/genproto/googleapis/api
v0.0.0-20240604185151-ef581f913117 // indirect
google.golang.org/genproto/googleapis/rpc
v0.0.0-20240604185151-ef581f913117 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
diff --git a/go.sum b/go.sum
index 57818b99..8da9ce63 100644
--- a/go.sum
+++ b/go.sum
@@ -1129,20 +1129,26 @@ github.com/kylelemons/godebug v1.1.0/go.mod
h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+
github.com/leodido/go-urn v1.2.2/go.mod
h1:kUaIbLZWttglzwNuG0pgsh5vuV6u2YcGBYz1hIPjtOQ=
github.com/leodido/go-urn v1.4.0
h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
github.com/leodido/go-urn v1.4.0/go.mod
h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
-github.com/lestrrat-go/blackmagic v1.0.2
h1:Cg2gVSc9h7sz9NOByczrbUvLopQmXrfFx//N+AkAr5k=
-github.com/lestrrat-go/blackmagic v1.0.2/go.mod
h1:UrEqBzIR2U6CnzVyUtfM6oZNMt/7O7Vohk2J0OGSAtU=
+github.com/lestrrat-go/blackmagic v1.0.4
h1:IwQibdnf8l2KoO+qC3uT4OaTWsW7tuRQXy9TRN9QanA=
+github.com/lestrrat-go/blackmagic v1.0.4/go.mod
h1:6AWFyKNNj0zEXQYfTMPfZrAXUWUfTIZ5ECEUEJaijtw=
+github.com/lestrrat-go/dsig v1.0.0
h1:OE09s2r9Z81kxzJYRn07TFM9XA4akrUdoMwr0L8xj38=
+github.com/lestrrat-go/dsig v1.0.0/go.mod
h1:dEgoOYYEJvW6XGbLasr8TFcAxoWrKlbQvmJgCR0qkDo=
+github.com/lestrrat-go/dsig-secp256k1 v1.0.0
h1:JpDe4Aybfl0soBvoVwjqDbp+9S1Y2OM7gcrVVMFPOzY=
+github.com/lestrrat-go/dsig-secp256k1 v1.0.0/go.mod
h1:CxUgAhssb8FToqbL8NjSPoGQlnO4w3LG1P0qPWQm/NU=
github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc
h1:RKf14vYWi2ttpEmkA4aQ3j4u9dStX2t4M8UM6qqNsG8=
github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc/go.mod
h1:kopuH9ugFRkIXf3YoqHKyrJ9YfUFsckUU9S7B+XP+is=
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
h1:Y6sqxHMyB1D2YSzWkLibYKgg+SwmyFU9dF2hn6MdTj4=
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible/go.mod
h1:ZQnN8lSECaebrkQytbHj4xNgtg8CR7RYXnPok8e0EHA=
github.com/lestrrat-go/httpcc v1.0.1
h1:ydWCStUeJLkpYyjLDHihupbn2tYmZ7m22BGkcvZZrIE=
github.com/lestrrat-go/httpcc v1.0.1/go.mod
h1:qiltp3Mt56+55GPVCbTdM9MlqhvzyuL6W/NMDA8vA5E=
-github.com/lestrrat-go/httprc/v3 v3.0.0-beta1
h1:pzDjP9dSONCFQC/AE3mWUnHILGiYPiMKzQIS+weKJXA=
-github.com/lestrrat-go/httprc/v3 v3.0.0-beta1/go.mod
h1:wdsgouffPvWPEYh8t7PRH/PidR5sfVqt0na4Nhj60Ms=
-github.com/lestrrat-go/jwx/v3 v3.0.0
h1:IRnFNdZx5dJHjTpPVkYqP6TRahJI2Z9v43UwEDJcj6U=
-github.com/lestrrat-go/jwx/v3 v3.0.0/go.mod
h1:ak32WoNtHE0aLowVWBcCvXngcAnW4tuC0YhFwOr/kwc=
+github.com/lestrrat-go/httprc/v3 v3.0.1
h1:3n7Es68YYGZb2Jf+k//llA4FTZMl3yCwIjFIk4ubevI=
+github.com/lestrrat-go/httprc/v3 v3.0.1/go.mod
h1:2uAvmbXE4Xq8kAUjVrZOq1tZVYYYs5iP62Cmtru00xk=
+github.com/lestrrat-go/jwx/v3 v3.0.12
h1:p25r68Y4KrbBdYjIsQweYxq794CtGCzcrc5dGzJIRjg=
+github.com/lestrrat-go/jwx/v3 v3.0.12/go.mod
h1:HiUSaNmMLXgZ08OmGBaPVvoZQgJVOQphSrGr5zMamS8=
github.com/lestrrat-go/option v1.0.1
h1:oAzP2fvZGQKWkvHa1/SAcFolBEca1oN+mQ7eooNBEYU=
github.com/lestrrat-go/option v1.0.1/go.mod
h1:5ZHFbivi4xwXxhxY9XHDe2FHo6/Z7WWmtT7T5nBBp3I=
+github.com/lestrrat-go/option/v2 v2.0.0
h1:XxrcaJESE1fokHy3FpaQ/cXW8ZsIdWcdFzzLOcID3Ss=
+github.com/lestrrat-go/option/v2 v2.0.0/go.mod
h1:oSySsmzMoR0iRzCDCaUfsCzxQHUEuhOViQObyy7S6Vg=
github.com/lestrrat-go/strftime v1.1.1
h1:zgf8QCsgj27GlKBy3SU9/8MMgegZ8UCzlCyHYrUF0QU=
github.com/lestrrat-go/strftime v1.1.1/go.mod
h1:YDrzHJAODYQ+xxvrn5SG01uFIQAeDTzpxNVppCz7Nmw=
github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570/go.mod
h1:BLt8L9ld7wVsvEWQbuLrUZnCMnUmLZ+CGDzKtclrTlE=
@@ -1385,8 +1391,8 @@ github.com/ryanuber/columnize v2.1.0+incompatible/go.mod
h1:sm1tb6uqfes/u+d4ooFo
github.com/ryanuber/go-glob v1.0.0/go.mod
h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod
h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod
h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
-github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
-github.com/segmentio/asm v1.2.0/go.mod
h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
+github.com/segmentio/asm v1.2.1 h1:DTNbBqs57ioxAD4PrArqftgypG4/qNpXoJx8TVXxPR0=
+github.com/segmentio/asm v1.2.1/go.mod
h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
github.com/shirou/gopsutil v3.20.11+incompatible/go.mod
h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil/v3 v3.21.6/go.mod
h1:JfVbDpIBLVzT8oKbvMg9P3wEIMDDpVn+LwHTKj0ST88=
github.com/shirou/gopsutil/v3 v3.22.2
h1:wCrArWFkHYIdDxx/FSfF5RB4dpJYW6t7rcp3+zL8uks=
@@ -1461,8 +1467,8 @@ github.com/stretchr/testify v1.8.2/go.mod
h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
github.com/stretchr/testify v1.8.3/go.mod
h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.8.4/go.mod
h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0/go.mod
h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
-github.com/stretchr/testify v1.10.0
h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
-github.com/stretchr/testify v1.10.0/go.mod
h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+github.com/stretchr/testify v1.11.1
h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
+github.com/stretchr/testify v1.11.1/go.mod
h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/subosito/gotenv v1.2.0
h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod
h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/swaggo/files v1.0.1 h1:J1bVJ4XHZNq0I46UU90611i9/YzdrF7x92oX1ig5IdE=
@@ -1505,6 +1511,8 @@ github.com/ugorji/go/codec v1.2.12/go.mod
h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZ
github.com/urfave/cli v1.20.0/go.mod
h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urfave/cli v1.22.1/go.mod
h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli/v2 v2.3.0/go.mod
h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI=
+github.com/valyala/fastjson v1.6.4
h1:uAUNq9Z6ymTgGhcm0UynUAB6tlbakBrz6CQFax3BXVQ=
+github.com/valyala/fastjson v1.6.4/go.mod
h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY=
github.com/wasmerio/wasmer-go v1.0.3
h1:9pWIlIqUKxALvFlWK8+Zy90qyqxd+8wlyVG91txh1TU=
github.com/wasmerio/wasmer-go v1.0.3/go.mod
h1:0gzVdSfg6pysA6QVp6iVRPTagC6Wq9pOE8J86WKb2Fk=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod
h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
@@ -1657,8 +1665,8 @@ golang.org/x/crypto v0.14.0/go.mod
h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf
golang.org/x/crypto v0.18.0/go.mod
h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/crypto v0.19.0/go.mod
h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.21.0/go.mod
h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
-golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4=
-golang.org/x/crypto v0.41.0/go.mod
h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc=
+golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04=
+golang.org/x/crypto v0.43.0/go.mod
h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod
h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod
h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod
h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@@ -1704,8 +1712,8 @@ golang.org/x/mod
v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91
golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
-golang.org/x/mod v0.27.0 h1:kb+q2PyFnEADO2IEF935ehFUXlWiNjJWtRNgBLSfbxQ=
-golang.org/x/mod v0.27.0/go.mod h1:rWI627Fq0DEoudcK+MBkNkCe0EetEaDSwJJkCcjpazc=
+golang.org/x/mod v0.28.0 h1:gQBtGhjxykdjY9YhZpSlZIsbnaE2+PgjfLWUQTnoZ1U=
+golang.org/x/mod v0.28.0/go.mod h1:yfB/L0NOf/kmEbXjzCPOx1iK1fRutOydrCMsqRhEBxI=
golang.org/x/net v0.0.0-20180530234432-1e491301e022/go.mod
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -1788,8 +1796,8 @@ golang.org/x/net v0.17.0/go.mod
h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
-golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE=
-golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg=
+golang.org/x/net v0.45.0 h1:RLBg5JKixCy82FtLJpeNlVM0nrSqpCRYzVU1n8kj0tM=
+golang.org/x/net v0.45.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod
h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod
h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod
h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -1834,8 +1842,8 @@ golang.org/x/sync
v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw=
-golang.org/x/sync v0.16.0/go.mod
h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
+golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
+golang.org/x/sync v0.17.0/go.mod
h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -1956,8 +1964,8 @@ golang.org/x/sys v0.13.0/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
-golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
-golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
+golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
+golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod
h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod
h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc=
@@ -1989,8 +1997,8 @@ golang.org/x/text v0.9.0/go.mod
h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.10.0/go.mod
h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.13.0/go.mod
h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0/go.mod
h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
-golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng=
-golang.org/x/text v0.28.0/go.mod
h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU=
+golang.org/x/text v0.30.0 h1:yznKA/E9zq54KzlzBEAWn1NXSQ8DIp/NYMy88xJjl4k=
+golang.org/x/text v0.30.0/go.mod
h1:yDdHFIX9t+tORqspjENWgzaCVXgk0yYnYuSZ8UzzBVM=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod
h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod
h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod
h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@@ -2074,8 +2082,8 @@ golang.org/x/tools v0.1.12/go.mod
h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc
golang.org/x/tools v0.3.0/go.mod
h1:/rWhSS2+zyEVwoJf8YAX6L2f0ntZ7Kn/mGgAWcipA5k=
golang.org/x/tools v0.6.0/go.mod
h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.7.0/go.mod
h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s=
-golang.org/x/tools v0.36.0 h1:kWS0uv/zsvHEle1LbV5LE8QujrxB3wfQyxHfhOk0Qkg=
-golang.org/x/tools v0.36.0/go.mod
h1:WBDiHKJK8YgLHlcQPYQzNCkUxUypCaa5ZegCVutKm+s=
+golang.org/x/tools v0.37.0 h1:DVSRzp7FwePZW356yEAChSdNcQo6Nsp+fex1SUW09lE=
+golang.org/x/tools v0.37.0/go.mod
h1:MBN5QPQtLMHVdvsbtarmTNukZDdgwdwlO5qGacAzF0w=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
diff --git a/pkg/adapter/mcpserver/registry/nacos/client_test.go
b/pkg/adapter/mcpserver/registry/nacos/client_test.go
index 1c953fb9..2b89760d 100644
--- a/pkg/adapter/mcpserver/registry/nacos/client_test.go
+++ b/pkg/adapter/mcpserver/registry/nacos/client_test.go
@@ -22,7 +22,6 @@ import (
"regexp"
"sort"
"strings"
- "sync"
"testing"
"time"
)
@@ -115,14 +114,11 @@ func createBrokenJSON() string {
}
type MockedNacosConfigClient struct {
- mu sync.Mutex
configs map[string]any
configListenerMap map[string][]func(string, string, string, string)
}
-func (m *MockedNacosConfigClient) GetConfig(param vo.ConfigParam) (string,
error) {
- m.mu.Lock()
- defer m.mu.Unlock()
+func (m MockedNacosConfigClient) GetConfig(param vo.ConfigParam) (string,
error) {
if result, exist := m.configs[param.DataId+"$$"+param.Group]; exist {
config, ok := result.(string)
if ok {
@@ -139,19 +135,17 @@ func (m *MockedNacosConfigClient) GetConfig(param
vo.ConfigParam) (string, error
return "", nil
}
-func (m *MockedNacosConfigClient) PublishConfig(param vo.ConfigParam) (bool,
error) {
+func (m MockedNacosConfigClient) PublishConfig(_ vo.ConfigParam) (bool, error)
{
//TODO implement me
panic("implement me")
}
-func (m *MockedNacosConfigClient) DeleteConfig(param vo.ConfigParam) (bool,
error) {
+func (m MockedNacosConfigClient) DeleteConfig(_ vo.ConfigParam) (bool, error) {
//TODO implement me
panic("implement me")
}
-func (m *MockedNacosConfigClient) ListenConfig(params vo.ConfigParam) (err
error) {
- m.mu.Lock()
- defer m.mu.Unlock()
+func (m MockedNacosConfigClient) ListenConfig(params vo.ConfigParam) (err
error) {
if _, ok := m.configListenerMap[params.Group]; !ok {
m.configListenerMap[params.Group] = []func(string, string,
string, string){}
}
@@ -159,25 +153,12 @@ func (m *MockedNacosConfigClient) ListenConfig(params
vo.ConfigParam) (err error
return nil
}
-func (m *MockedNacosConfigClient) CancelListenConfig(params vo.ConfigParam)
(err error) {
- m.mu.Lock()
- defer m.mu.Unlock()
+func (m MockedNacosConfigClient) CancelListenConfig(params vo.ConfigParam)
(err error) {
delete(m.configListenerMap, params.DataId+"$$"+params.Group)
return nil
}
-func (m *MockedNacosConfigClient) GetListener(key string, index int)
func(string, string, string, string) {
- m.mu.Lock()
- defer m.mu.Unlock()
- if listeners, ok := m.configListenerMap[key]; ok && len(listeners) >
index {
- return listeners[index]
- }
- return nil
-}
-
-func (m *MockedNacosConfigClient) SearchConfig(param vo.SearchConfigParam)
(*model.ConfigPage, error) {
- m.mu.Lock()
- defer m.mu.Unlock()
+func (m MockedNacosConfigClient) SearchConfig(param vo.SearchConfigParam)
(*model.ConfigPage, error) {
dataIdRegex := strings.ReplaceAll(param.DataId, "*", ".*")
groupRegex := strings.ReplaceAll(param.Group, "*", ".*")
result := []model.ConfigItem{}
@@ -213,7 +194,7 @@ func (m *MockedNacosConfigClient) SearchConfig(param
vo.SearchConfigParam) (*mod
}, nil
}
-func (m *MockedNacosConfigClient) CloseClient() {
+func (m MockedNacosConfigClient) CloseClient() {
//TODO implement me
panic("implement me")
}
@@ -222,22 +203,22 @@ type MockedNacosNamingClient struct {
listenerMap map[string][]func(services []model.Instance, err error)
}
-func (m MockedNacosNamingClient) RegisterInstance(param
vo.RegisterInstanceParam) (bool, error) {
+func (m MockedNacosNamingClient) RegisterInstance(_ vo.RegisterInstanceParam)
(bool, error) {
//TODO implement me
panic("implement me")
}
-func (m MockedNacosNamingClient) BatchRegisterInstance(param
vo.BatchRegisterInstanceParam) (bool, error) {
+func (m MockedNacosNamingClient) BatchRegisterInstance(_
vo.BatchRegisterInstanceParam) (bool, error) {
//TODO implement me
panic("implement me")
}
-func (m MockedNacosNamingClient) DeregisterInstance(param
vo.DeregisterInstanceParam) (bool, error) {
+func (m MockedNacosNamingClient) DeregisterInstance(_
vo.DeregisterInstanceParam) (bool, error) {
//TODO implement me
panic("implement me")
}
-func (m MockedNacosNamingClient) UpdateInstance(param vo.UpdateInstanceParam)
(bool, error) {
+func (m MockedNacosNamingClient) UpdateInstance(_ vo.UpdateInstanceParam)
(bool, error) {
//TODO implement me
panic("implement me")
}
@@ -255,17 +236,17 @@ func (m MockedNacosNamingClient) GetService(param
vo.GetServiceParam) (model.Ser
}, nil
}
-func (m MockedNacosNamingClient) SelectAllInstances(param
vo.SelectAllInstancesParam) ([]model.Instance, error) {
+func (m MockedNacosNamingClient) SelectAllInstances(_
vo.SelectAllInstancesParam) ([]model.Instance, error) {
//TODO implement me
panic("implement me")
}
-func (m MockedNacosNamingClient) SelectInstances(param
vo.SelectInstancesParam) ([]model.Instance, error) {
+func (m MockedNacosNamingClient) SelectInstances(_ vo.SelectInstancesParam)
([]model.Instance, error) {
//TODO implement me
panic("implement me")
}
-func (m MockedNacosNamingClient) SelectOneHealthyInstance(param
vo.SelectOneHealthInstanceParam) (*model.Instance, error) {
+func (m MockedNacosNamingClient) SelectOneHealthyInstance(_
vo.SelectOneHealthInstanceParam) (*model.Instance, error) {
//TODO implement me
panic("implement me")
}
@@ -278,11 +259,11 @@ func (m MockedNacosNamingClient) Subscribe(param
*vo.SubscribeParam) error {
return nil
}
-func (m MockedNacosNamingClient) Unsubscribe(param *vo.SubscribeParam) error {
+func (m MockedNacosNamingClient) Unsubscribe(_ *vo.SubscribeParam) error {
return nil
}
-func (m MockedNacosNamingClient) GetAllServicesInfo(param
vo.GetAllServiceInfoParam) (model.ServiceList, error) {
+func (m MockedNacosNamingClient) GetAllServicesInfo(_
vo.GetAllServiceInfoParam) (model.ServiceList, error) {
//TODO implement me
panic("implement me")
}
@@ -307,7 +288,7 @@ func TestNacosRegistryClient_ListMcpServer(t *testing.T) {
}
client := NacosRegistryClient{
- configClient: &MockedNacosConfigClient{configs: mockedConfigs},
+ configClient: MockedNacosConfigClient{configs: mockedConfigs},
}
servers, err := client.ListMcpServer()
@@ -353,7 +334,7 @@ func TestNacosRegistryClient_ListenToMcpServer(t
*testing.T) {
serverConfigKey113 := fmt.Sprintf("%s-%s-mcp-server.json%smcp-server",
testMcpServerID, testVersion113, configKeySeparator)
toolsConfigKey113 := fmt.Sprintf("%s-%s-mcp-tools.json%smcp-tools",
testMcpServerID, testVersion113, configKeySeparator)
- configClient := &MockedNacosConfigClient{
+ configClient := MockedNacosConfigClient{
configs: map[string]any{
versionConfigKey:
createExploreServerVersionConfig(testVersion112),
serverConfigKey112:
createMcpServerConfig(testMcpServerID, testVersion112, testServiceName),
@@ -385,30 +366,15 @@ func TestNacosRegistryClient_ListenToMcpServer(t
*testing.T) {
// Set up listener for configuration changes
var newConfig *McpServerConfig
- var configMu sync.RWMutex
-
- // Helper functions to safely access newConfig
- getConfig := func() *McpServerConfig {
- configMu.RLock()
- defer configMu.RUnlock()
- return newConfig
- }
-
- setConfig := func(cfg *McpServerConfig) {
- configMu.Lock()
- defer configMu.Unlock()
- newConfig = cfg
- }
-
err = client.ListenToMcpServer(testMcpServerID, func(info
*McpServerConfig) {
- setConfig(info)
+ newConfig = info
})
if err != nil {
t.Fatalf("Failed to start listening to MCP server: %v", err)
}
// Wait for initial configuration to be loaded
- for i := 0; i < testRetryMaxAttempts && getConfig() == nil; i++ {
+ for i := 0; i < testRetryMaxAttempts && newConfig == nil; i++ {
time.Sleep(testRetryInterval)
}
@@ -418,21 +384,19 @@ func TestNacosRegistryClient_ListenToMcpServer(t
*testing.T) {
// Replace nacos template with processed version
expectedToolsConfig = strings.ReplaceAll(expectedToolsConfig,
fmt.Sprintf("${nacos.%s/%s}", testConfigKey, testConfigKey),
fmt.Sprintf(".config.credentials.%s", testCredentialKey))
- cfg := getConfig()
- assert.Equal(t, expectedServerConfig, cfg.ServerSpecConfig)
- assert.Equal(t, expectedToolsConfig, cfg.ToolsSpecConfig)
- assert.Equal(t, 1, len(cfg.Credentials))
- assert.Equal(t, map[string]any{"key": testSecretKey},
cfg.Credentials[testCredentialKey])
+ assert.Equal(t, expectedServerConfig, newConfig.ServerSpecConfig)
+ assert.Equal(t, expectedToolsConfig, newConfig.ToolsSpecConfig)
+ assert.Equal(t, 1, len(newConfig.Credentials))
+ assert.Equal(t, map[string]any{"key": testSecretKey},
newConfig.Credentials[testCredentialKey])
// Test case 1: Change tool nacos template reference
- listener := configClient.GetListener(toolsConfigKey112, 0)
+ listener := configClient.configListenerMap[toolsConfigKey112][0]
updatedToolsConfig := createMcpToolsConfig(fmt.Sprintf("%s/%s",
testConfigKey1, testConfigKey1))
listener(testNamespace, "mcp-tools", toolsConfigKey112,
updatedToolsConfig)
// Wait for tools update to propagate
for i := 0; i < testRetryMaxAttempts; i++ {
- cfg := getConfig()
- if cfg != nil && strings.Contains(cfg.ToolsSpecConfig,
testCredentialKey1) {
+ if newConfig != nil &&
strings.Contains(newConfig.ToolsSpecConfig, testCredentialKey1) {
break
}
time.Sleep(testRetryInterval)
@@ -440,33 +404,30 @@ func TestNacosRegistryClient_ListenToMcpServer(t
*testing.T) {
// Verify updated tools configuration
expectedUpdatedToolsConfig := strings.ReplaceAll(updatedToolsConfig,
fmt.Sprintf("${nacos.%s/%s}", testConfigKey1, testConfigKey1),
fmt.Sprintf(".config.credentials.%s", testCredentialKey1))
- cfg = getConfig()
- assert.Equal(t, expectedUpdatedToolsConfig, cfg.ToolsSpecConfig)
- assert.Equal(t, 1, len(cfg.Credentials))
- assert.Equal(t, map[string]any{"key": testSecretKey1},
cfg.Credentials[testCredentialKey1])
+ assert.Equal(t, expectedUpdatedToolsConfig, newConfig.ToolsSpecConfig)
+ assert.Equal(t, 1, len(newConfig.Credentials))
+ assert.Equal(t, map[string]any{"key": testSecretKey1},
newConfig.Credentials[testCredentialKey1])
// Test case 2: Change backend service name
- serviceListener := configClient.GetListener(serverConfigKey112, 0)
+ serviceListener := configClient.configListenerMap[serverConfigKey112][0]
updatedServerConfig := createMcpServerConfig(testMcpServerID,
testVersion112, testServiceNameNew)
serviceListener(testNamespace, "mcp-server", serverConfigKey112,
updatedServerConfig)
for i := 0; i < testRetryMaxAttempts; i++ {
- cfg := getConfig()
- if cfg != nil && strings.Contains(cfg.ServerSpecConfig,
testServiceNameNew) {
+ if newConfig != nil &&
strings.Contains(newConfig.ServerSpecConfig, testServiceNameNew) {
break
}
time.Sleep(testRetryInterval)
}
// Test case 3: Publish new version of MCP server
- versionListener := configClient.GetListener(versionConfigKey, 0)
+ versionListener := configClient.configListenerMap[versionConfigKey][0]
updatedVersionConfig := createExploreServerVersionConfig(testVersion113)
versionListener(testNamespace, testGroupNameMcpVersions,
versionConfigKey, updatedVersionConfig)
// Wait for version update to trigger server config change
for i := 0; i < testRetryMaxAttempts; i++ {
- cfg := getConfig()
- if cfg != nil && strings.Contains(cfg.ServerSpecConfig,
fmt.Sprintf("\"version\":\"%s\"", testVersion113)) {
+ if newConfig != nil &&
strings.Contains(newConfig.ServerSpecConfig, fmt.Sprintf("\"version\":\"%s\"",
testVersion113)) {
break
}
time.Sleep(testRetryInterval)
@@ -474,8 +435,7 @@ func TestNacosRegistryClient_ListenToMcpServer(t
*testing.T) {
// Wait for tools config to update to new version reference
for i := 0; i < testRetryMaxAttempts; i++ {
- cfg := getConfig()
- if cfg != nil && strings.Contains(cfg.ToolsSpecConfig,
testCredentialKey3) {
+ if newConfig != nil &&
strings.Contains(newConfig.ToolsSpecConfig, testCredentialKey3) {
break
}
time.Sleep(testRetryInterval)
@@ -486,9 +446,8 @@ func TestNacosRegistryClient_ListenToMcpServer(t
*testing.T) {
expectedFinalToolsConfig := createMcpToolsConfig(fmt.Sprintf("%s/%s",
testConfigKey3, testConfigKey3))
expectedFinalToolsConfig = strings.ReplaceAll(expectedFinalToolsConfig,
fmt.Sprintf("${nacos.%s/%s}", testConfigKey3, testConfigKey3),
fmt.Sprintf(".config.credentials.%s", testCredentialKey3))
- cfg = getConfig()
- assert.Equal(t, expectedFinalServerConfig, cfg.ServerSpecConfig)
- assert.Equal(t, expectedFinalToolsConfig, cfg.ToolsSpecConfig)
- assert.Equal(t, 1, len(cfg.Credentials))
- assert.Equal(t, map[string]any{"key": testSecretKey3},
cfg.Credentials[testCredentialKey3])
+ assert.Equal(t, expectedFinalServerConfig, newConfig.ServerSpecConfig)
+ assert.Equal(t, expectedFinalToolsConfig, newConfig.ToolsSpecConfig)
+ assert.Equal(t, 1, len(newConfig.Credentials))
+ assert.Equal(t, map[string]any{"key": testSecretKey3},
newConfig.Credentials[testCredentialKey3])
}
diff --git a/pkg/adapter/mcpserver/registrycenter.go
b/pkg/adapter/mcpserver/registrycenter.go
index 9cf5326f..6351f72f 100644
--- a/pkg/adapter/mcpserver/registrycenter.go
+++ b/pkg/adapter/mcpserver/registrycenter.go
@@ -167,7 +167,7 @@ func (a *Adapter) Apply() error {
}
// 1) apply tools dynamically to registry for filter
usage
- if dc := mcpserver.GetOrInitDynamic(); dc != nil {
+ if dc := mcpserver.GetOrInitDynamicConsumer(); dc !=
nil {
if err :=
dc.ApplyMcpServerConfigByServer(serverId, cfg); err != nil {
logger.Errorf("[dubbo-go-pixiu] mcp
adapter apply server %s config error: %v", serverId, err)
}
diff --git a/pkg/common/constant/http.go b/pkg/common/constant/http.go
index 287ac3b9..2156ab90 100644
--- a/pkg/common/constant/http.go
+++ b/pkg/common/constant/http.go
@@ -25,6 +25,7 @@ const (
HeaderKeyContentLength = "Content-Length"
HeaderKeyContentEncoding = "Content-Encoding"
HeaderKeyUserAgent = "User-Agent"
+ HeaderKeyAccept = "Accept"
HeaderKeyAccessControlAllowOrigin = "Access-Control-Allow-Origin"
HeaderKeyAccessControlAllowHeaders = "Access-Control-Allow-Headers"
@@ -43,6 +44,11 @@ const (
HeaderValueChunked = "chunked"
HeaderValueTextPrefix = "text/"
+ // Media type wildcards for Accept header negotiation
+ MediaTypeWildcard = "*/*"
+ MediaTypeApplicationWild = "application/*"
+ MediaTypeTextWild = "text/*"
+
HeaderValueGzip = "gzip"
HeaderValueDeflate = "deflate"
@@ -94,6 +100,18 @@ const (
XForwardedProto = "X-Forwarded-Proto"
)
+// MCP (Model Context Protocol) specific headers
+const (
+ HeaderKeyMCPProtocolVersion = "Mcp-Protocol-Version"
+ HeaderKeyMCPSessionId = "Mcp-Session-Id"
+)
+
+// MCP protocol versions
+const (
+ MCPProtocolVersion20250618 = "2025-06-18"
+ MCPProtocolVersion20250326 = "2025-03-26"
+)
+
// SSE response prefixes
const (
SSEData = "data"
diff --git a/pkg/filter/mcp/mcpserver/context.go
b/pkg/filter/mcp/mcpserver/context.go
index de9877c6..3b7fa6bf 100644
--- a/pkg/filter/mcp/mcpserver/context.go
+++ b/pkg/filter/mcp/mcpserver/context.go
@@ -34,6 +34,14 @@ type MCPData struct {
Method string
// RequestID stores JSON-RPC request ID
RequestID any
+ // SessionID stores MCP session ID for SSE connections
+ SessionID string
+ // AcceptSSE indicates if client accepts text/event-stream
+ AcceptSSE bool
+ // AcceptJSON indicates if client accepts application/json
+ AcceptJSON bool
+ // ProtocolVersion stores MCP protocol version from header
+ ProtocolVersion string
}
// MCPContext MCP context wrapper that composes HttpContext and provides
MCP-specific operations
@@ -109,3 +117,101 @@ func NewMCPContextFromHttpContext(httpCtx
*contexthttp.HttpContext) *MCPContext
func (ctx *MCPContext) ClearContentLengthHeader() {
ctx.Writer.Header().Del(constant.HeaderKeyContentLength)
}
+
+// SessionID management methods
+
+// SetSessionID sets MCP session ID
+func (ctx *MCPContext) SetSessionID(sessionID string) {
+ ctx.mcpData.SessionID = sessionID
+}
+
+// SessionID gets MCP session ID
+func (ctx *MCPContext) SessionID() string {
+ return ctx.mcpData.SessionID
+}
+
+// HasSession checks if context has a session ID
+func (ctx *MCPContext) HasSession() bool {
+ return ctx.mcpData.SessionID != ""
+}
+
+// Accept header management methods
+
+// SetAcceptSSE sets if client accepts text/event-stream
+func (ctx *MCPContext) SetAcceptSSE(acceptSSE bool) {
+ ctx.mcpData.AcceptSSE = acceptSSE
+}
+
+// AcceptSSE gets if client accepts text/event-stream
+func (ctx *MCPContext) AcceptSSE() bool {
+ return ctx.mcpData.AcceptSSE
+}
+
+// SetAcceptJSON sets if client accepts application/json
+func (ctx *MCPContext) SetAcceptJSON(acceptJSON bool) {
+ ctx.mcpData.AcceptJSON = acceptJSON
+}
+
+// AcceptJSON gets if client accepts application/json
+func (ctx *MCPContext) AcceptJSON() bool {
+ return ctx.mcpData.AcceptJSON
+}
+
+// Protocol version management methods
+
+// SetProtocolVersion sets MCP protocol version
+func (ctx *MCPContext) SetProtocolVersion(version string) {
+ ctx.mcpData.ProtocolVersion = version
+}
+
+// ProtocolVersion gets MCP protocol version
+func (ctx *MCPContext) ProtocolVersion() string {
+ return ctx.mcpData.ProtocolVersion
+}
+
+// ParseAndSetAcceptHeader parses Accept header and sets AcceptSSE/AcceptJSON
flags
+func (ctx *MCPContext) ParseAndSetAcceptHeader() {
+ acceptHeader := ctx.Request.Header.Get(constant.HeaderKeyAccept)
+ ctx.mcpData.AcceptJSON = acceptHeader == "" || // Default to JSON for
backward compatibility
+ containsMediaType(acceptHeader,
constant.HeaderValueApplicationJson) ||
+ containsMediaType(acceptHeader,
constant.MediaTypeApplicationWild) ||
+ containsMediaType(acceptHeader, constant.MediaTypeWildcard)
+
+ ctx.mcpData.AcceptSSE = containsMediaType(acceptHeader,
constant.HeaderValueTextEventStream) ||
+ containsMediaType(acceptHeader, constant.MediaTypeTextWild) ||
+ containsMediaType(acceptHeader, constant.MediaTypeWildcard)
+}
+
+// ParseAndSetSessionHeader parses Mcp-Session-Id header and sets session ID
+func (ctx *MCPContext) ParseAndSetSessionHeader() {
+ sessionID := ctx.Request.Header.Get(constant.HeaderKeyMCPSessionId)
+ ctx.mcpData.SessionID = sessionID
+}
+
+// ParseAndSetProtocolVersionHeader parses MCP-Protocol-Version header
+func (ctx *MCPContext) ParseAndSetProtocolVersionHeader() {
+ version := ctx.Request.Header.Get(constant.HeaderKeyMCPProtocolVersion)
+ ctx.mcpData.ProtocolVersion = version
+}
+
+// containsMediaType checks if the Accept header contains the specified media
type
+func containsMediaType(acceptHeader, mediaType string) bool {
+ if acceptHeader == "" {
+ return false
+ }
+ // Simple contains check - could be enhanced with proper media type
parsing
+ return len(acceptHeader) > 0 && (acceptHeader == mediaType ||
+ len(acceptHeader) >= len(mediaType) &&
(acceptHeader[:len(mediaType)] == mediaType ||
+ acceptHeader[len(acceptHeader)-len(mediaType):] ==
mediaType ||
+ containsSubstring(acceptHeader, mediaType)))
+}
+
+// containsSubstring is a helper function for media type checking
+func containsSubstring(s, substr string) bool {
+ for i := 0; i <= len(s)-len(substr); i++ {
+ if s[i:i+len(substr)] == substr {
+ return true
+ }
+ }
+ return false
+}
diff --git a/pkg/filter/mcp/mcpserver/dynamic.go
b/pkg/filter/mcp/mcpserver/dynamic.go
index a2b1c718..737fe6c4 100644
--- a/pkg/filter/mcp/mcpserver/dynamic.go
+++ b/pkg/filter/mcp/mcpserver/dynamic.go
@@ -20,6 +20,7 @@ package mcpserver
import (
"crypto/sha256"
"encoding/hex"
+ "encoding/json"
"fmt"
"sort"
"sync"
@@ -27,6 +28,7 @@ import (
)
import (
+ "github.com/apache/dubbo-go-pixiu/pkg/filter/mcp/mcpserver/transport"
"github.com/apache/dubbo-go-pixiu/pkg/logger"
"github.com/apache/dubbo-go-pixiu/pkg/model"
)
@@ -39,15 +41,6 @@ const (
EmptyFingerprint = "00000000"
)
-var (
- globalRegistry *ToolRegistry
- globalDynamic *DynamicConsumer
-
- // sync.Once variables for thread-safe singleton initialization
- registryOnce sync.Once
- dynamicOnce sync.Once
-)
-
// ServerToolConfig tool configuration for a single server
type ServerToolConfig struct {
Tools []model.ToolConfig
@@ -55,25 +48,11 @@ type ServerToolConfig struct {
LastApplied time.Time
}
-// GetOrInitRegistry returns a singleton ToolRegistry
-func GetOrInitRegistry() *ToolRegistry {
- registryOnce.Do(func() {
- globalRegistry = NewToolRegistry()
- })
- return globalRegistry
-}
-
-// GetOrInitDynamic returns a singleton DynamicConsumer
-func GetOrInitDynamic() *DynamicConsumer {
- dynamicOnce.Do(func() {
- globalDynamic = NewDynamicConsumer(GetOrInitRegistry())
- })
- return globalDynamic
-}
-
// DynamicConsumer applies dynamic MCP configurations into the registry
type DynamicConsumer struct {
- registry *ToolRegistry
+ registry *ToolRegistry
+ sessionManager *transport.SessionManager
+ sseHandler *transport.SSEHandler
// Tool configuration management grouped by server
mu sync.RWMutex
@@ -81,11 +60,13 @@ type DynamicConsumer struct {
debounceTime time.Duration
}
-func NewDynamicConsumer(reg *ToolRegistry) *DynamicConsumer {
+func NewDynamicConsumer(reg *ToolRegistry, sm *transport.SessionManager,
sseHandler *transport.SSEHandler) *DynamicConsumer {
return &DynamicConsumer{
- registry: reg,
- serverConfigs: make(map[string]*ServerToolConfig),
- debounceTime: DefaultDebounceTime,
+ registry: reg,
+ sessionManager: sm,
+ sseHandler: sseHandler,
+ serverConfigs: make(map[string]*ServerToolConfig),
+ debounceTime: DefaultDebounceTime,
}
}
@@ -144,6 +125,9 @@ func (d *DynamicConsumer)
ApplyMcpServerConfigByServer(serverId string, cfg *mod
logger.Infof("[dubbo-go-pixiu] mcp server %s config applied: %d tools,
total servers: %d, merged tools: %d",
serverId, len(cfg.Tools), len(d.serverConfigs),
len(mergedTools))
+ // Notify all connected clients about tools list change
+ d.notifyToolsListChanged()
+
return nil
}
@@ -166,7 +150,8 @@ func (d *DynamicConsumer) calculateFingerprint(tools
[]model.ToolConfig) string
// Build hash input string
hash := sha256.New()
for _, tool := range sortedTools {
- fmt.Fprintf(hash, "name:%s;cluster:%s;args:%d;", tool.Name,
tool.Cluster, len(tool.Args))
+ _, _ = fmt.Fprintf(hash, "name:%s;cluster:%s;args:%d;",
tool.Name, tool.Cluster, len(tool.Args))
+
}
// Return first 8 characters of hex encoded hash
@@ -246,3 +231,63 @@ func (d *DynamicConsumer) calculateCurrentMergedTools()
[]model.ToolConfig {
return allTools
}
+
+// notifyToolsListChanged sends notifications/tools/list_changed to all
connected clients
+func (d *DynamicConsumer) notifyToolsListChanged() {
+ if d.sessionManager == nil {
+ logger.Debugf("[dubbo-go-pixiu] mcp server session manager not
available, skip tools list_changed notification")
+ return
+ }
+
+ // Get all active sessions
+ sessionIDs := d.sessionManager.AllSessionIDs()
+ if len(sessionIDs) == 0 {
+ logger.Debugf("[dubbo-go-pixiu] mcp server no active sessions,
skip tools list_changed notification")
+ return
+ }
+
+ // Send notification to each session
+ successCount := 0
+ for _, sessionID := range sessionIDs {
+ if err := d.sendToolsListChangedNotification(sessionID); err !=
nil {
+ logger.Warnf("[dubbo-go-pixiu] mcp server failed to
send tools list_changed to session %s: %v", sessionID, err)
+ } else {
+ successCount++
+ }
+ }
+
+ logger.Infof("[dubbo-go-pixiu] mcp server sent tools/list_changed
notification to %d/%d sessions", successCount, len(sessionIDs))
+}
+
+// sendToolsListChangedNotification sends notification to a specific session
+func (d *DynamicConsumer) sendToolsListChangedNotification(sessionID string)
error {
+ session, exists := d.sessionManager.Session(sessionID)
+ if !exists {
+ return fmt.Errorf("session not found")
+ }
+
+ if session.PipeWriter == nil {
+ return fmt.Errorf("SSE pipe not established")
+ }
+
+ // Build tools/list_changed notification (no params needed)
+ notification := map[string]any{
+ "jsonrpc": "2.0",
+ "method": "notifications/tools/list_changed",
+ }
+
+ messageJSON, err := json.Marshal(notification)
+ if err != nil {
+ return fmt.Errorf("failed to marshal notification: %w", err)
+ }
+
+ sseData := d.sseHandler.FormatSSEMessage(string(messageJSON))
+
+ if _, err := session.PipeWriter.Write([]byte(sseData)); err != nil {
+ return fmt.Errorf("failed to write to SSE pipe: %w", err)
+ }
+
+ session.LastActivity = time.Now()
+ logger.Debugf("[dubbo-go-pixiu] mcp server sent tools/list_changed to
session: %s", sessionID)
+ return nil
+}
diff --git a/pkg/filter/mcp/mcpserver/dynamic_notification_test.go
b/pkg/filter/mcp/mcpserver/dynamic_notification_test.go
new file mode 100644
index 00000000..a74592d9
--- /dev/null
+++ b/pkg/filter/mcp/mcpserver/dynamic_notification_test.go
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mcpserver
+
+import (
+ "io"
+ "strings"
+ "testing"
+ "time"
+)
+
+import (
+ "github.com/stretchr/testify/assert"
+)
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/model"
+)
+
+func TestNotifyToolsListChanged(t *testing.T) {
+ // Reset global state
+ ResetGlobalState()
+ defer ResetGlobalState()
+
+ consumer := GetOrInitDynamicConsumer()
+ sm := GetOrInitSessionManager()
+
+ // Create session with SSE pipe
+ session, _ := sm.EnsureSession("")
+ pipeReader, pipeWriter := io.Pipe()
+ session.PipeWriter = pipeWriter
+ defer pipeReader.Close()
+ defer pipeWriter.Close()
+
+ // Apply config with tools (will trigger notification)
+ config := createTestMcpServerConfig([]model.ToolConfig{
+ {Name: "tool1", Cluster: "cluster1"},
+ })
+
+ // Read notification in background
+ notificationCh := make(chan string, 1)
+ go func() {
+ buf := make([]byte, 1024)
+ n, err := pipeReader.Read(buf)
+ if err == nil && n > 0 {
+ notificationCh <- string(buf[:n])
+ }
+ }()
+
+ // Apply config
+ consumer.ResetDebounceState()
+ err := consumer.ApplyMcpServerConfigByServer("server1", config)
+ assert.NoError(t, err)
+
+ // Wait for notification
+ select {
+ case notification := <-notificationCh:
+ // Verify notification format
+ assert.True(t, strings.HasPrefix(notification, "data:"),
"Should start with 'data:'")
+ assert.True(t, strings.HasSuffix(notification, "\n\n"), "Should
end with \\n\\n")
+ assert.True(t, strings.Contains(notification,
"notifications/tools/list_changed"), "Should contain method name")
+ assert.True(t, strings.Contains(notification, "jsonrpc"),
"Should contain jsonrpc field")
+ case <-time.After(1 * time.Second):
+ t.Fatal("Did not receive notification within timeout")
+ }
+}
+
+func TestNotifyToolsListChanged_MultipleSessions(t *testing.T) {
+ ResetGlobalState()
+ defer ResetGlobalState()
+
+ consumer := GetOrInitDynamicConsumer()
+ sm := GetOrInitSessionManager()
+
+ // Create multiple sessions with pipes
+ numSessions := 3
+ readers := make([]io.ReadCloser, numSessions)
+ writers := make([]*io.PipeWriter, numSessions)
+ notificationChs := make([]chan string, numSessions)
+
+ for i := 0; i < numSessions; i++ {
+ session, _ := sm.EnsureSession("")
+ pipeReader, pipeWriter := io.Pipe()
+ session.PipeWriter = pipeWriter
+ readers[i] = pipeReader
+ writers[i] = pipeWriter
+ defer pipeReader.Close()
+ defer pipeWriter.Close()
+
+ // Start reader for each session
+ notificationChs[i] = make(chan string, 1)
+ go func(idx int, reader io.Reader, ch chan string) {
+ buf := make([]byte, 1024)
+ n, err := reader.Read(buf)
+ if err == nil && n > 0 {
+ ch <- string(buf[:n])
+ }
+ }(i, pipeReader, notificationChs[i])
+ }
+
+ // Apply config
+ config := createTestMcpServerConfig([]model.ToolConfig{
+ {Name: "tool1", Cluster: "cluster1"},
+ })
+
+ consumer.ResetDebounceState()
+ err := consumer.ApplyMcpServerConfigByServer("server1", config)
+ assert.NoError(t, err)
+
+ // Verify all sessions received notification
+ receivedCount := 0
+ for i := 0; i < numSessions; i++ {
+ select {
+ case notification := <-notificationChs[i]:
+ assert.Contains(t, notification,
"notifications/tools/list_changed")
+ receivedCount++
+ case <-time.After(1 * time.Second):
+ t.Logf("Session %d did not receive notification", i)
+ }
+ }
+
+ assert.Equal(t, numSessions, receivedCount, "All sessions should
receive notification")
+}
+
+func TestNotifyToolsListChanged_NoActiveSessions(t *testing.T) {
+ ResetGlobalState()
+ defer ResetGlobalState()
+
+ consumer := GetOrInitDynamicConsumer()
+
+ // Apply config without any active sessions
+ config := createTestMcpServerConfig([]model.ToolConfig{
+ {Name: "tool1", Cluster: "cluster1"},
+ })
+
+ consumer.ResetDebounceState()
+ err := consumer.ApplyMcpServerConfigByServer("server1", config)
+ assert.NoError(t, err)
+
+ // Should not panic or error, just skip notification
+ assert.Equal(t, 0, GetOrInitSessionManager().ActiveSessionCount())
+}
+
+func TestNotifyToolsListChanged_DisconnectedSession(t *testing.T) {
+ ResetGlobalState()
+ defer ResetGlobalState()
+
+ consumer := GetOrInitDynamicConsumer()
+ sm := GetOrInitSessionManager()
+
+ // Create session but don't attach pipe
+ _, _ = sm.EnsureSession("")
+ // session.PipeWriter is nil
+
+ // Apply config
+ config := createTestMcpServerConfig([]model.ToolConfig{
+ {Name: "tool1", Cluster: "cluster1"},
+ })
+
+ consumer.ResetDebounceState()
+ err := consumer.ApplyMcpServerConfigByServer("server1", config)
+ assert.NoError(t, err)
+
+ // Should handle disconnected session gracefully (logged as warning)
+ // No panic or error
+}
diff --git a/pkg/filter/mcp/mcpserver/dynamic_test.go
b/pkg/filter/mcp/mcpserver/dynamic_test.go
index 1e95b67c..e221402d 100644
--- a/pkg/filter/mcp/mcpserver/dynamic_test.go
+++ b/pkg/filter/mcp/mcpserver/dynamic_test.go
@@ -29,20 +29,13 @@ import (
)
import (
+ "github.com/apache/dubbo-go-pixiu/pkg/filter/mcp/mcpserver/transport"
"github.com/apache/dubbo-go-pixiu/pkg/model"
)
//
=============================================================================
// Test Utilities
//
=============================================================================
-// resetSingletons resets singleton state for testing
-func resetSingletons() {
- globalRegistry = nil
- globalDynamic = nil
- registryOnce = sync.Once{}
- dynamicOnce = sync.Once{}
-}
-
// createTestToolConfig creates a simple test tool configuration
func createTestToolConfig(name, description string) model.ToolConfig {
return model.ToolConfig{
@@ -83,7 +76,7 @@ func createTestMcpServerConfig(tools []model.ToolConfig)
*model.McpServerConfig
//
=============================================================================
func TestSingletonInstances(t *testing.T) {
- resetSingletons()
+ ResetGlobalState()
t.Run("Registry singleton", func(t *testing.T) {
registry1 := GetOrInitRegistry()
@@ -92,15 +85,15 @@ func TestSingletonInstances(t *testing.T) {
})
t.Run("Dynamic consumer singleton", func(t *testing.T) {
- dynamic1 := GetOrInitDynamic()
- dynamic2 := GetOrInitDynamic()
+ dynamic1 := GetOrInitDynamicConsumer()
+ dynamic2 := GetOrInitDynamicConsumer()
assert.Same(t, dynamic1, dynamic2)
assert.Same(t, dynamic1.registry, GetOrInitRegistry())
})
}
func TestSingletonConcurrency(t *testing.T) {
- resetSingletons()
+ ResetGlobalState()
const numGoroutines = 50
var wg sync.WaitGroup
@@ -130,7 +123,10 @@ func TestSingletonConcurrency(t *testing.T) {
func TestApplyMcpServerConfig(t *testing.T) {
t.Run("Basic configuration application", func(t *testing.T) {
registry := NewToolRegistry()
- consumer := NewDynamicConsumer(registry)
+ sm := transport.NewSessionManager()
+ defer sm.Stop()
+ sseHandler := transport.NewSSEHandler(sm)
+ consumer := NewDynamicConsumer(registry, sm, sseHandler)
// Test nil config
err := consumer.ApplyMcpServerConfigByServer("default", nil)
@@ -160,7 +156,10 @@ func TestApplyMcpServerConfig(t *testing.T) {
t.Run("Configuration replacement", func(t *testing.T) {
registry := NewToolRegistry()
- consumer := NewDynamicConsumer(registry)
+ sm := transport.NewSessionManager()
+ defer sm.Stop()
+ sseHandler := transport.NewSSEHandler(sm)
+ consumer := NewDynamicConsumer(registry, sm, sseHandler)
// Apply first config
config1 := createTestMcpServerConfig([]model.ToolConfig{
@@ -187,9 +186,9 @@ func TestApplyMcpServerConfig(t *testing.T) {
}
func TestApplyMcpServerConfigConcurrent(t *testing.T) {
- resetSingletons()
+ ResetGlobalState()
registry := GetOrInitRegistry()
- consumer := GetOrInitDynamic()
+ consumer := GetOrInitDynamicConsumer()
const numGoroutines = 10
var wg sync.WaitGroup
@@ -222,7 +221,10 @@ func TestApplyMcpServerConfigConcurrent(t *testing.T) {
func TestDebounceFeatures(t *testing.T) {
t.Run("Content debounce - skip identical configs", func(t *testing.T) {
registry := NewToolRegistry()
- consumer := NewDynamicConsumer(registry)
+ sm := transport.NewSessionManager()
+ defer sm.Stop()
+ sseHandler := transport.NewSSEHandler(sm)
+ consumer := NewDynamicConsumer(registry, sm, sseHandler)
config := createTestMcpServerConfig([]model.ToolConfig{
createTestToolConfig("tool1", "Test tool"),
@@ -245,7 +247,10 @@ func TestDebounceFeatures(t *testing.T) {
t.Run("Time debounce - skip rapid calls", func(t *testing.T) {
registry := NewToolRegistry()
- consumer := NewDynamicConsumer(registry)
+ sm := transport.NewSessionManager()
+ defer sm.Stop()
+ sseHandler := transport.NewSSEHandler(sm)
+ consumer := NewDynamicConsumer(registry, sm, sseHandler)
config1 := createTestMcpServerConfig([]model.ToolConfig{
createTestToolConfig("tool1", "First tool"),
@@ -271,7 +276,10 @@ func TestDebounceFeatures(t *testing.T) {
t.Run("Empty configuration handling", func(t *testing.T) {
registry := NewToolRegistry()
- consumer := NewDynamicConsumer(registry)
+ sm := transport.NewSessionManager()
+ defer sm.Stop()
+ sseHandler := transport.NewSSEHandler(sm)
+ consumer := NewDynamicConsumer(registry, sm, sseHandler)
// Add tool first
config := createTestMcpServerConfig([]model.ToolConfig{
@@ -296,7 +304,10 @@ func TestDebounceFeatures(t *testing.T) {
func TestDebounceConfiguration(t *testing.T) {
registry := NewToolRegistry()
- consumer := NewDynamicConsumer(registry)
+ sm := transport.NewSessionManager()
+ defer sm.Stop()
+ sseHandler := transport.NewSSEHandler(sm)
+ consumer := NewDynamicConsumer(registry, sm, sseHandler)
// Test default debounce time
info := consumer.GetDebounceInfo()
@@ -329,7 +340,10 @@ func TestDebounceConfiguration(t *testing.T) {
func TestFingerprintCalculation(t *testing.T) {
registry := NewToolRegistry()
- consumer := NewDynamicConsumer(registry)
+ sm := transport.NewSessionManager()
+ defer sm.Stop()
+ sseHandler := transport.NewSSEHandler(sm)
+ consumer := NewDynamicConsumer(registry, sm, sseHandler)
// Empty tools
fingerprint1 := consumer.calculateFingerprint([]model.ToolConfig{})
@@ -362,10 +376,10 @@ func TestFingerprintCalculation(t *testing.T) {
//
=============================================================================
func TestIntegration(t *testing.T) {
- resetSingletons()
+ ResetGlobalState()
registry := GetOrInitRegistry()
- consumer := GetOrInitDynamic()
+ consumer := GetOrInitDynamicConsumer()
// Verify initial state
assert.Empty(t, registry.ListTools())
@@ -402,7 +416,10 @@ func TestIntegration(t *testing.T) {
func BenchmarkApplyMcpServerConfig(b *testing.B) {
registry := NewToolRegistry()
- consumer := NewDynamicConsumer(registry)
+ sm := transport.NewSessionManager()
+ defer sm.Stop()
+ sseHandler := transport.NewSSEHandler(sm)
+ consumer := NewDynamicConsumer(registry, sm, sseHandler)
config := createTestMcpServerConfig([]model.ToolConfig{
createTestToolConfig("tool1", "First tool"),
diff --git a/pkg/filter/mcp/mcpserver/filter.go
b/pkg/filter/mcp/mcpserver/filter.go
index 1ca5908e..1a56cb75 100644
--- a/pkg/filter/mcp/mcpserver/filter.go
+++ b/pkg/filter/mcp/mcpserver/filter.go
@@ -22,6 +22,7 @@ import (
"fmt"
"io"
"net/http"
+ "time"
)
import (
@@ -29,8 +30,10 @@ import (
)
import (
+ "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
contexthttp "github.com/apache/dubbo-go-pixiu/pkg/context/http"
+ "github.com/apache/dubbo-go-pixiu/pkg/filter/mcp/mcpserver/transport"
"github.com/apache/dubbo-go-pixiu/pkg/logger"
"github.com/apache/dubbo-go-pixiu/pkg/model"
)
@@ -45,10 +48,13 @@ type (
// MCPServerFilter is a filter that handles MCP protocol.
MCPServerFilter struct {
- cfg *model.McpServerConfig
- registry *ToolRegistry
- errorHandler *ErrorHandler
- responseBuilder *ResponseBuilder
+ cfg *model.McpServerConfig
+ registry *ToolRegistry
+ errorHandler *ErrorHandler
+ responseBuilder *ResponseBuilder
+ sessionManager *transport.SessionManager
+ sseHandler *transport.SSEHandler
+ contentNegotiator *transport.ContentNegotiator
}
)
@@ -97,11 +103,19 @@ func (f *FilterFactory) Config() any {
// PrepareFilterChain prepares the filter chain
func (f *FilterFactory) PrepareFilterChain(_ *contexthttp.HttpContext, chain
filter.FilterChain) error {
+ // Get global session manager singleton
+ sessionManager := GetOrInitSessionManager()
+ sseHandler := transport.NewSSEHandler(sessionManager)
+ contentNegotiator := transport.NewContentNegotiator()
+
mcpFilter := &MCPServerFilter{
- cfg: f.cfg,
- registry: f.registry,
- errorHandler: NewErrorHandler(),
- responseBuilder: NewResponseBuilder(),
+ cfg: f.cfg,
+ registry: f.registry,
+ errorHandler: NewErrorHandler(),
+ responseBuilder: NewResponseBuilder(),
+ sessionManager: sessionManager,
+ sseHandler: sseHandler,
+ contentNegotiator: contentNegotiator,
}
chain.AppendDecodeFilters(mcpFilter)
chain.AppendEncodeFilters(mcpFilter) // Add to Encode chain
@@ -118,36 +132,25 @@ func (f *MCPServerFilter) Decode(ctx
*contexthttp.HttpContext) filter.FilterStat
// Create MCP context wrapper
mcpCtx := NewMCPContext(ctx)
- // Read request body
- body, err := io.ReadAll(ctx.Request.Body)
- if err != nil {
- logger.Errorf("[dubbo-go-pixiu] mcp server failed to read
request body: %v", err)
- return f.errorHandler.SendInternalError(mcpCtx, nil, "failed to
read request body")
- }
+ // Parse HTTP headers
+ mcpCtx.ParseAndSetProtocolVersionHeader()
+ mcpCtx.ParseAndSetSessionHeader()
+ mcpCtx.ParseAndSetAcceptHeader()
- // Parse JSON-RPC request
- var jsonrpcReq mcp.JSONRPCRequest
- if err := json.Unmarshal(body, &jsonrpcReq); err != nil {
- logger.Errorf("[dubbo-go-pixiu] mcp server failed to parse
JSON-RPC request: %v", err)
- return f.errorHandler.SendInternalError(mcpCtx, nil, "invalid
JSON-RPC request")
+ // Protocol version is logged for debugging
+ version := mcpCtx.ProtocolVersion()
+ if version != "" {
+ logger.Debugf("[dubbo-go-pixiu] mcp server client protocol
version: %s", version)
}
- logger.Infof("[dubbo-go-pixiu] mcp server received request: %s (id:
%v)", jsonrpcReq.Method, jsonrpcReq.ID)
-
- // Store information in MCP context
- mcpCtx.SetMCPMethod(jsonrpcReq.Method)
- mcpCtx.SetMCPRequestID(jsonrpcReq.ID)
-
- // Handle terminal methods (methods that don't need forwarding to
backend)
- if f.isTerminalMethod(jsonrpcReq.Method) {
- return f.handleTerminalMethod(mcpCtx, jsonrpcReq)
- } else if jsonrpcReq.Method == string(mcp.MethodToolsCall) {
- // Tool call will be processed in Encode stage (IsMCPToolCall()
checks method)
- return f.handleToolCall(mcpCtx, jsonrpcReq)
- } else {
- // Unknown method
- logger.Warnf("[dubbo-go-pixiu] mcp server unsupported method:
%s", jsonrpcReq.Method)
- return f.errorHandler.SendMethodNotFound(mcpCtx, jsonrpcReq.ID)
+ // Dispatch based on HTTP method
+ switch ctx.Request.Method {
+ case constant.Get:
+ return f.handleGetRequest(mcpCtx)
+ case constant.Post:
+ return f.handlePostRequest(mcpCtx)
+ default:
+ return f.sendMethodNotAllowed(mcpCtx)
}
}
@@ -163,33 +166,6 @@ func (f *MCPServerFilter) isTerminalMethod(method string)
bool {
}
}
-// handleTerminalMethod handles terminal methods
-func (f *MCPServerFilter) handleTerminalMethod(ctx *MCPContext, req
mcp.JSONRPCRequest) filter.FilterStatus {
- switch req.Method {
- case string(mcp.MethodInitialize):
- return f.handleInitialize(ctx, req)
- case string(mcp.MethodToolsList):
- return f.handleToolsList(ctx, req)
- case string(mcp.MethodResourcesList):
- return f.handleResourcesList(ctx, req)
- case string(mcp.MethodResourcesRead):
- return f.handleResourceRead(ctx, req)
- case "resources/templates/list":
- return f.handleResourceTemplatesList(ctx, req)
- case string(mcp.MethodPromptsList):
- return f.handlePromptsList(ctx, req)
- case string(mcp.MethodPromptsGet):
- return f.handlePromptsGet(ctx, req)
- case "notifications/initialized":
- return f.handleNotificationsInitialized(ctx, req)
- case string(mcp.MethodPing):
- return f.handlePing(ctx, req)
- default:
- logger.Warnf("[dubbo-go-pixiu] mcp server unsupported method:
%s", req.Method)
- return f.errorHandler.SendMethodNotFound(ctx, req.ID)
- }
-}
-
// Encode processes outgoing HTTP responses.
func (f *MCPServerFilter) Encode(ctx *contexthttp.HttpContext)
filter.FilterStatus {
// Create MCP context wrapper and load stored MCP data
@@ -235,3 +211,327 @@ func (f *MCPServerFilter) sendJSONResponse(ctx
*MCPContext, response any) filter
ctx.SendLocalReply(http.StatusOK, responseBody)
return filter.Stop
}
+
+// handleGetRequest handles HTTP GET requests for SSE stream establishment
+func (f *MCPServerFilter) handleGetRequest(ctx *MCPContext)
filter.FilterStatus {
+ logger.Infof("[dubbo-go-pixiu] mcp server handling GET request for SSE
stream")
+
+ // Validate Accept header includes text/event-stream
+ if !ctx.AcceptSSE() {
+ logger.Warnf("[dubbo-go-pixiu] mcp server GET request must
accept text/event-stream")
+ return f.sendNotAcceptable(ctx, "GET request must accept
text/event-stream")
+ }
+
+ // Get or create session
+ sessionIDHeader := ctx.SessionID()
+ session, isNewSession := f.sessionManager.EnsureSession(sessionIDHeader)
+ ctx.SetSessionID(session.ID)
+
+ // Create io.Pipe for SSE message transport
+ pipeReader, pipeWriter := io.Pipe()
+ session.PipeWriter = pipeWriter
+
+ // Create virtual HTTP response with pipe as body
+ virtualResp := &http.Response{
+ StatusCode: http.StatusOK,
+ Header: http.Header{
+ constant.HeaderKeyContextType:
[]string{constant.HeaderValueTextEventStream},
+ constant.HeaderKeyCacheControl:
[]string{constant.HeaderValueNoCache},
+ constant.HeaderKeyConnection:
[]string{constant.HeaderValueKeepAlive},
+ constant.HeaderKeyMCPSessionId:
[]string{session.ID},
+ constant.HeaderKeyAccessControlAllowOrigin:
[]string{constant.HeaderValueAll},
+ },
+ Body: pipeReader,
+ }
+
+ // Set SourceResp to let buildTargetResponse convert to StreamResponse
+ ctx.SourceResp = virtualResp
+ ctx.StatusCode(http.StatusOK)
+
+ // Start background goroutine to maintain the SSE connection
+ go f.maintainSSEPipe(ctx, session)
+
+ if isNewSession {
+ logger.Infof("[dubbo-go-pixiu] mcp server established new SSE
stream for session: %s", session.ID)
+ } else {
+ logger.Infof("[dubbo-go-pixiu] mcp server resumed SSE stream
for existing session: %s", session.ID)
+ }
+
+ // Return Stop to skip remaining filters and backend call
+ return filter.Stop
+}
+
+// handlePostRequest handles HTTP POST requests with JSON-RPC messages
+func (f *MCPServerFilter) handlePostRequest(ctx *MCPContext)
filter.FilterStatus {
+ logger.Debugf("[dubbo-go-pixiu] mcp server handling POST request")
+
+ // Read request body
+ body, err := io.ReadAll(ctx.Request.Body)
+ if err != nil {
+ logger.Errorf("[dubbo-go-pixiu] mcp server failed to read
request body: %v", err)
+ return f.errorHandler.SendInternalError(ctx, nil, "failed to
read request body")
+ }
+
+ // Parse JSON-RPC request
+ var jsonrpcReq mcp.JSONRPCRequest
+ if err := json.Unmarshal(body, &jsonrpcReq); err != nil {
+ logger.Errorf("[dubbo-go-pixiu] mcp server failed to parse
JSON-RPC request: %v", err)
+ return f.errorHandler.SendInternalError(ctx, nil, "invalid
JSON-RPC request")
+ }
+
+ logger.Infof("[dubbo-go-pixiu] mcp server received POST request: %s
(id: %v)", jsonrpcReq.Method, jsonrpcReq.ID)
+
+ // Store information in MCP context
+ ctx.SetMCPMethod(jsonrpcReq.Method)
+ ctx.SetMCPRequestID(jsonrpcReq.ID)
+
+ // Determine response format based on content negotiation
+ sessionID := ctx.SessionID()
+ hasSession := sessionID != "" && f.sessionExists(sessionID)
+ responseFormat := f.contentNegotiator.NegotiateResponse(
+ ctx.Request.Header.Get(constant.HeaderKeyAccept), hasSession)
+
+ // Store response format decision in context for later use
+ ctx.StoreMCPDataInParams()
+
+ // Handle different request types
+ if f.isTerminalMethod(jsonrpcReq.Method) {
+ return f.handleTerminalMethodWithNegotiation(ctx, jsonrpcReq,
responseFormat)
+ } else if jsonrpcReq.Method == string(mcp.MethodToolsCall) {
+ return f.handleToolCallWithNegotiation(ctx, jsonrpcReq,
responseFormat)
+ } else {
+ // Unknown method
+ logger.Warnf("[dubbo-go-pixiu] mcp server unsupported method:
%s", jsonrpcReq.Method)
+ return f.errorHandler.SendMethodNotFound(ctx, jsonrpcReq.ID)
+ }
+}
+
+// handleTerminalMethodWithNegotiation handles terminal methods with response
format negotiation
+func (f *MCPServerFilter) handleTerminalMethodWithNegotiation(ctx *MCPContext,
req mcp.JSONRPCRequest, responseFormat transport.ResponseFormat)
filter.FilterStatus {
+ // Special case 1: initialize always returns JSON immediately (no SSE
option per MCP spec)
+ if req.Method == string(mcp.MethodInitialize) {
+ return f.handleInitialize(ctx, req)
+ }
+
+ // Special case 2: notifications/initialized always returns 202
Accepted with no body
+ if req.Method == "notifications/initialized" {
+ logger.Infof("[dubbo-go-pixiu] mcp server received initialized
notification, returning 202 Accepted")
+ ctx.SendLocalReply(http.StatusAccepted, nil)
+ return filter.Stop
+ }
+
+ // For other terminal methods, dispatch to specific handlers with
response format
+ switch req.Method {
+ case string(mcp.MethodToolsList):
+ return f.handleToolsList(ctx, req, responseFormat)
+ case string(mcp.MethodResourcesList):
+ return f.handleResourcesList(ctx, req, responseFormat)
+ case string(mcp.MethodResourcesRead):
+ return f.handleResourceRead(ctx, req, responseFormat)
+ case "resources/templates/list":
+ return f.handleResourceTemplatesList(ctx, req, responseFormat)
+ case string(mcp.MethodPromptsList):
+ return f.handlePromptsList(ctx, req, responseFormat)
+ case string(mcp.MethodPromptsGet):
+ return f.handlePromptsGet(ctx, req, responseFormat)
+ case string(mcp.MethodPing):
+ return f.handlePing(ctx, req, responseFormat)
+ default:
+ logger.Warnf("[dubbo-go-pixiu] mcp server unsupported terminal
method: %s", req.Method)
+ return f.errorHandler.SendMethodNotFound(ctx, req.ID)
+ }
+}
+
+// handleToolCallWithNegotiation handles tool calls with response format
negotiation
+func (f *MCPServerFilter) handleToolCallWithNegotiation(ctx *MCPContext, req
mcp.JSONRPCRequest, responseFormat transport.ResponseFormat)
filter.FilterStatus {
+ // For notifications (no response needed), send 202 Accepted immediately
+ if responseFormat == transport.ResponseFormatAccepted {
+ ctx.SendLocalReply(http.StatusAccepted, nil)
+ return filter.Stop
+ }
+
+ // For tool calls, we need to forward to backend, so continue with
existing logic
+ // but store the response format for use in Encode stage
+ return f.handleToolCall(ctx, req)
+}
+
+// sendResponseWithFormat sends response in the negotiated format
+func (f *MCPServerFilter) sendResponseWithFormat(ctx *MCPContext, response
any, format transport.ResponseFormat) filter.FilterStatus {
+ switch format {
+ case transport.ResponseFormatJSON:
+ return f.sendJSONResponse(ctx, response)
+ case transport.ResponseFormatSSE:
+ return f.sendSSEResponse(ctx, response)
+ case transport.ResponseFormatAccepted:
+ ctx.SendLocalReply(http.StatusAccepted, nil)
+ return filter.Stop
+ default:
+ return f.sendJSONResponse(ctx, response)
+ }
+}
+
+// sendSSEResponse sends response via SSE stream
+func (f *MCPServerFilter) sendSSEResponse(ctx *MCPContext, response any)
filter.FilterStatus {
+ sessionID := ctx.SessionID()
+ if sessionID == "" {
+ // No session, fall back to JSON
+ return f.sendJSONResponse(ctx, response)
+ }
+
+ session, exists := f.sessionManager.Session(sessionID)
+ if !exists {
+ // Session not found, fall back to JSON
+ logger.Warnf("[dubbo-go-pixiu] mcp server session not found:
%s, falling back to JSON", sessionID)
+ return f.sendJSONResponse(ctx, response)
+ }
+
+ // Send via SSE
+ if err := f.sseHandler.SendSSEMessage(session, response); err != nil {
+ logger.Errorf("[dubbo-go-pixiu] mcp server failed to send SSE
message: %v", err)
+ // Fall back to JSON
+ return f.sendJSONResponse(ctx, response)
+ }
+
+ // SSE message sent successfully, return 202 Accepted per MCP spec
+ logger.Debugf("[dubbo-go-pixiu] mcp server sent response via SSE for
session: %s", sessionID)
+ ctx.SendLocalReply(http.StatusAccepted, nil)
+ return filter.Stop
+}
+
+// sessionExists checks if a session exists
+func (f *MCPServerFilter) sessionExists(sessionID string) bool {
+ _, exists := f.sessionManager.Session(sessionID)
+ return exists
+}
+
+// sendBadRequest sends a 400 Bad Request response
+func (f *MCPServerFilter) sendBadRequest(ctx *MCPContext, message string)
filter.FilterStatus {
+ logger.Warnf("[dubbo-go-pixiu] mcp server bad request: %s", message)
+ ctx.SendLocalReply(http.StatusBadRequest, []byte(message))
+ return filter.Stop
+}
+
+// sendMethodNotAllowed sends a 405 Method Not Allowed response
+func (f *MCPServerFilter) sendMethodNotAllowed(ctx *MCPContext)
filter.FilterStatus {
+ logger.Warnf("[dubbo-go-pixiu] mcp server method not allowed: %s",
ctx.Request.Method)
+ ctx.Writer.Header().Set("Allow", "GET, POST")
+ ctx.SendLocalReply(http.StatusMethodNotAllowed, []byte("Method Not
Allowed"))
+ return filter.Stop
+}
+
+// sendNotAcceptable sends a 406 Not Acceptable response
+func (f *MCPServerFilter) sendNotAcceptable(ctx *MCPContext, message string)
filter.FilterStatus {
+ logger.Warnf("[dubbo-go-pixiu] mcp server not acceptable: %s", message)
+ ctx.SendLocalReply(http.StatusNotAcceptable, []byte(message))
+ return filter.Stop
+}
+
+// sendInternalError sends a 500 Internal Server Error response
+func (f *MCPServerFilter) sendInternalError(ctx *MCPContext, message string)
filter.FilterStatus {
+ logger.Errorf("[dubbo-go-pixiu] mcp server internal error: %s", message)
+ ctx.SendLocalReply(http.StatusInternalServerError, []byte("Internal
Server Error"))
+ return filter.Stop
+}
+
+// maintainSSEPipe maintains the SSE pipe connection with keepalive
+func (f *MCPServerFilter) maintainSSEPipe(ctx *MCPContext, session
*transport.MCPSession) {
+ // Ensure cleanup on exit
+ defer func() {
+ if session.PipeWriter != nil {
+ session.PipeWriter.Close()
+ }
+ f.sessionManager.RemoveSession(session.ID)
+ logger.Debugf("[dubbo-go-pixiu] mcp server SSE pipe maintenance
ended for session: %s", session.ID)
+ }()
+
+ ticker := time.NewTicker(transport.KeepaliveInterval)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ticker.C:
+ // Send keepalive comment (ignored by SSE clients)
+ keepalive :=
f.sseHandler.FormatSSEKeepalive(time.Now().Unix())
+ if _, err :=
session.PipeWriter.Write([]byte(keepalive)); err != nil {
+ logger.Warnf("[dubbo-go-pixiu] mcp server
keepalive write failed for session %s: %v", session.ID, err)
+ return
+ }
+ session.LastActivity = time.Now()
+ logger.Debugf("[dubbo-go-pixiu] mcp server sent
keepalive for session: %s", session.ID)
+
+ case <-session.Done:
+ // Server-initiated close
+ logger.Infof("[dubbo-go-pixiu] mcp server closing SSE
stream (server initiated) for session: %s", session.ID)
+ return
+
+ case <-ctx.Ctx.Done():
+ // Client disconnected
+ logger.Infof("[dubbo-go-pixiu] mcp server closing SSE
stream (client disconnected) for session: %s", session.ID)
+ return
+ }
+ }
+}
+
+// SendServerNotification sends a server notification to the client via SSE
stream
+func (f *MCPServerFilter) SendServerNotification(sessionID string, method
string, params map[string]any) error {
+ session, exists := f.sessionManager.Session(sessionID)
+ if !exists {
+ return fmt.Errorf("session not found: %s", sessionID)
+ }
+
+ if session.PipeWriter == nil {
+ return fmt.Errorf("SSE pipe not established for session: %s",
sessionID)
+ }
+
+ // Use ResponseBuilder to create notification
+ notification := f.responseBuilder.ServerNotification(method, params)
+
+ // Send via SSE
+ if err := f.sendMessageToSSEPipe(session, notification); err != nil {
+ return err
+ }
+
+ logger.Debugf("[dubbo-go-pixiu] mcp server sent notification to session
%s: %s", sessionID, method)
+ return nil
+}
+
+// SendServerRequest sends a server request to the client via SSE stream
+func (f *MCPServerFilter) SendServerRequest(sessionID string, id any, method
string, params any) error {
+ session, exists := f.sessionManager.Session(sessionID)
+ if !exists {
+ return fmt.Errorf("session not found: %s", sessionID)
+ }
+
+ if session.PipeWriter == nil {
+ return fmt.Errorf("SSE pipe not established for session: %s",
sessionID)
+ }
+
+ // Use ResponseBuilder to create request
+ request := f.responseBuilder.ServerRequest(id, method, params)
+
+ // Send via SSE
+ if err := f.sendMessageToSSEPipe(session, request); err != nil {
+ return err
+ }
+
+ logger.Debugf("[dubbo-go-pixiu] mcp server sent request to session %s:
%s (id: %v)", sessionID, method, id)
+ return nil
+}
+
+// sendMessageToSSEPipe sends a message to the SSE pipe
+func (f *MCPServerFilter) sendMessageToSSEPipe(session *transport.MCPSession,
message any) error {
+ messageJSON, err := json.Marshal(message)
+ if err != nil {
+ return fmt.Errorf("failed to marshal message: %w", err)
+ }
+
+ // Use SSEHandler to format the message
+ sseData := f.sseHandler.FormatSSEMessage(string(messageJSON))
+
+ if _, err := session.PipeWriter.Write([]byte(sseData)); err != nil {
+ return fmt.Errorf("failed to write to SSE pipe: %w", err)
+ }
+
+ session.LastActivity = time.Now()
+ return nil
+}
diff --git a/pkg/filter/mcp/mcpserver/filter_sse_test.go
b/pkg/filter/mcp/mcpserver/filter_sse_test.go
new file mode 100644
index 00000000..c42474ad
--- /dev/null
+++ b/pkg/filter/mcp/mcpserver/filter_sse_test.go
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mcpserver
+
+import (
+ "context"
+ "io"
+ "net/http"
+ "net/http/httptest"
+ "strings"
+ "testing"
+ "time"
+)
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
+ "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
+ contexthttp "github.com/apache/dubbo-go-pixiu/pkg/context/http"
+ "github.com/apache/dubbo-go-pixiu/pkg/filter/mcp/mcpserver/transport"
+ "github.com/apache/dubbo-go-pixiu/pkg/model"
+)
+
+func TestHandleGetRequest_SSEStream(t *testing.T) {
+ // Create filter
+ mcpFilter := createTestFilter(t)
+
+ // Create GET request
+ req := httptest.NewRequest(constant.Get, "/mcp", nil)
+ req.Header.Set(constant.HeaderKeyAccept,
constant.HeaderValueTextEventStream)
+ req.Header.Set(constant.HeaderKeyMCPProtocolVersion,
constant.MCPProtocolVersion20250618)
+
+ recorder := httptest.NewRecorder()
+ ctx := createTestContext(req, recorder)
+ mcpCtx := NewMCPContext(ctx)
+
+ // Parse headers
+ mcpCtx.ParseAndSetProtocolVersionHeader()
+ mcpCtx.ParseAndSetSessionHeader()
+ mcpCtx.ParseAndSetAcceptHeader()
+
+ // Execute handleGetRequest
+ status := mcpFilter.handleGetRequest(mcpCtx)
+
+ // Verify filter status
+ if status != filter.Stop {
+ t.Errorf("Expected filter.Stop, got %v", status)
+ }
+
+ // Verify SourceResp is set
+ if ctx.SourceResp == nil {
+ t.Fatal("SourceResp should be set")
+ }
+
+ httpResp, ok := ctx.SourceResp.(*http.Response)
+ if !ok {
+ t.Fatal("SourceResp should be *http.Response")
+ }
+
+ // Verify response headers
+ if httpResp.StatusCode != http.StatusOK {
+ t.Errorf("Expected status 200, got %d", httpResp.StatusCode)
+ }
+
+ contentType := httpResp.Header.Get(constant.HeaderKeyContextType)
+ if contentType != constant.HeaderValueTextEventStream {
+ t.Errorf("Expected Content-Type %s, got %s",
constant.HeaderValueTextEventStream, contentType)
+ }
+
+ sessionID := httpResp.Header.Get(constant.HeaderKeyMCPSessionId)
+ if sessionID == "" {
+ t.Error("Mcp-Session-Id should be set")
+ }
+
+ // Verify session was created
+ session, exists := mcpFilter.sessionManager.Session(sessionID)
+ if !exists {
+ t.Error("Session should be created")
+ }
+ if session.PipeWriter == nil {
+ t.Error("Session PipeWriter should be set")
+ }
+
+ // Verify response body is pipe reader
+ if httpResp.Body == nil {
+ t.Error("Response body should be set")
+ }
+
+ // Cleanup
+ mcpFilter.sessionManager.Stop()
+}
+
+func TestHandleGetRequest_MissingAcceptHeader(t *testing.T) {
+ mcpFilter := createTestFilter(t)
+ defer mcpFilter.sessionManager.Stop()
+
+ // Create GET request without Accept header
+ req := httptest.NewRequest(constant.Get, "/mcp", nil)
+ recorder := httptest.NewRecorder()
+ ctx := createTestContext(req, recorder)
+ mcpCtx := NewMCPContext(ctx)
+
+ mcpCtx.ParseAndSetAcceptHeader()
+
+ status := mcpFilter.handleGetRequest(mcpCtx)
+
+ // Should return error
+ if status != filter.Stop {
+ t.Error("Expected filter.Stop for missing Accept header")
+ }
+
+ // Check for error response
+ if recorder.Code != http.StatusNotAcceptable {
+ t.Errorf("Expected status 406, got %d", recorder.Code)
+ }
+}
+
+func TestHandleGetRequest_ResumeExistingSession(t *testing.T) {
+ mcpFilter := createTestFilter(t)
+ defer mcpFilter.sessionManager.Stop()
+
+ // Create first session
+ session1, _ := mcpFilter.sessionManager.EnsureSession("")
+ sessionID := session1.ID
+
+ // Create GET request with existing session ID
+ req := httptest.NewRequest(constant.Get, "/mcp", nil)
+ req.Header.Set(constant.HeaderKeyAccept,
constant.HeaderValueTextEventStream)
+ req.Header.Set(constant.HeaderKeyMCPSessionId, sessionID)
+
+ recorder := httptest.NewRecorder()
+ ctx := createTestContext(req, recorder)
+ mcpCtx := NewMCPContext(ctx)
+
+ mcpCtx.ParseAndSetSessionHeader()
+ mcpCtx.ParseAndSetAcceptHeader()
+
+ status := mcpFilter.handleGetRequest(mcpCtx)
+
+ if status != filter.Stop {
+ t.Errorf("Expected filter.Stop, got %v", status)
+ }
+
+ // Verify same session is reused
+ session2, exists := mcpFilter.sessionManager.Session(sessionID)
+ if !exists {
+ t.Error("Session should exist")
+ }
+ if session1.ID != session2.ID {
+ t.Error("Should reuse existing session")
+ }
+}
+
+func TestMaintainSSEPipe_Keepalive(t *testing.T) {
+ mcpFilter := createTestFilter(t)
+ defer mcpFilter.sessionManager.Stop()
+
+ // Create session with pipe
+ session, _ := mcpFilter.sessionManager.EnsureSession("")
+ pipeReader, pipeWriter := io.Pipe()
+ session.PipeWriter = pipeWriter
+
+ // Create context with timeout
+ ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+ defer cancel()
+
+ httpCtx := &contexthttp.HttpContext{
+ Ctx: ctx,
+ }
+ mcpCtx := NewMCPContext(httpCtx)
+
+ // Start maintainSSEPipe with very short interval for testing
+ // Note: This test uses production KeepaliveInterval (30s), so we won't
actually receive keepalive in 2s
+ go mcpFilter.maintainSSEPipe(mcpCtx, session)
+
+ // Read from pipe in background
+ dataCh := make(chan string, 10)
+ go func() {
+ buf := make([]byte, 1024)
+ for {
+ n, err := pipeReader.Read(buf)
+ if err != nil {
+ return
+ }
+ if n > 0 {
+ dataCh <- string(buf[:n])
+ }
+ }
+ }()
+
+ // Wait a bit to see if we receive anything (we shouldn't with 30s
interval)
+ select {
+ case data := <-dataCh:
+ // If we somehow received data (unlikely with 30s interval),
verify it's keepalive
+ if !strings.HasPrefix(data, ":") {
+ t.Errorf("Expected keepalive comment, got: %s", data)
+ }
+ case <-time.After(500 * time.Millisecond):
+ // Expected: no data yet due to 30s interval
+ }
+
+ // Cancel context to stop maintenance
+ cancel()
+ time.Sleep(100 * time.Millisecond)
+
+ // Verify session was cleaned up
+ _, exists := mcpFilter.sessionManager.Session(session.ID)
+ if exists {
+ t.Error("Session should be removed after context cancellation")
+ }
+}
+
+func TestSendServerNotification(t *testing.T) {
+ mcpFilter := createTestFilter(t)
+ defer mcpFilter.sessionManager.Stop()
+
+ // Create session with pipe
+ session, _ := mcpFilter.sessionManager.EnsureSession("")
+ pipeReader, pipeWriter := io.Pipe()
+ session.PipeWriter = pipeWriter
+ sessionID := session.ID
+
+ // Send notification in goroutine
+ params := map[string]any{
+ "message": "test notification",
+ "value": 123,
+ }
+
+ errCh := make(chan error, 1)
+ go func() {
+ errCh <- mcpFilter.SendServerNotification(sessionID,
"notifications/test", params)
+ }()
+
+ // Read from pipe
+ buf := make([]byte, 1024)
+ n, err := pipeReader.Read(buf)
+ if err != nil {
+ t.Fatalf("Failed to read from pipe: %v", err)
+ }
+
+ received := string(buf[:n])
+
+ // Verify SSE format
+ if !strings.HasPrefix(received, "data:") {
+ t.Error("Should start with 'data:'")
+ }
+ if !strings.HasSuffix(received, "\n\n") {
+ t.Error("Should end with \\n\\n")
+ }
+ if !strings.Contains(received, "notifications/test") {
+ t.Error("Should contain method name")
+ }
+ if !strings.Contains(received, "jsonrpc") {
+ t.Error("Should contain jsonrpc field")
+ }
+
+ // Verify no error
+ if err := <-errCh; err != nil {
+ t.Errorf("SendServerNotification failed: %v", err)
+ }
+
+ pipeWriter.Close()
+ pipeReader.Close()
+}
+
+func TestSendServerNotification_NoSession(t *testing.T) {
+ mcpFilter := createTestFilter(t)
+ defer mcpFilter.sessionManager.Stop()
+
+ err := mcpFilter.SendServerNotification("non-existent-id", "test",
map[string]any{})
+ if err == nil {
+ t.Error("Expected error for non-existent session")
+ }
+ if !strings.Contains(err.Error(), "session not found") {
+ t.Errorf("Unexpected error message: %v", err)
+ }
+}
+
+func TestSendServerRequest(t *testing.T) {
+ mcpFilter := createTestFilter(t)
+ defer mcpFilter.sessionManager.Stop()
+
+ // Create session with pipe
+ session, _ := mcpFilter.sessionManager.EnsureSession("")
+ pipeReader, pipeWriter := io.Pipe()
+ session.PipeWriter = pipeWriter
+ sessionID := session.ID
+
+ // Send request in goroutine
+ params := map[string]any{"key": "value"}
+
+ errCh := make(chan error, 1)
+ go func() {
+ errCh <- mcpFilter.SendServerRequest(sessionID, 123,
"prompts/get", params)
+ }()
+
+ // Read from pipe
+ buf := make([]byte, 1024)
+ n, err := pipeReader.Read(buf)
+ if err != nil {
+ t.Fatalf("Failed to read from pipe: %v", err)
+ }
+
+ received := string(buf[:n])
+
+ // Verify SSE format
+ if !strings.HasPrefix(received, "data:") {
+ t.Error("Should start with 'data:'")
+ }
+ if !strings.Contains(received, "prompts/get") {
+ t.Error("Should contain method name")
+ }
+ if !strings.Contains(received, `"id"`) {
+ t.Error("Should contain id field")
+ }
+
+ // Verify no error
+ if err := <-errCh; err != nil {
+ t.Errorf("SendServerRequest failed: %v", err)
+ }
+
+ pipeWriter.Close()
+ pipeReader.Close()
+}
+
+// Note: TestValidateMCPProtocolVersion removed - version negotiation now
happens
+// during initialize request/response per MCP spec, not at HTTP header level.
+
+// Helper functions
+
+func createTestFilter(t *testing.T) *MCPServerFilter {
+ cfg := &model.McpServerConfig{
+ ServerInfo: model.ServerInfo{
+ Name: "Test Server",
+ Version: "1.0.0",
+ },
+ Endpoint: "/mcp",
+ Tools: []model.ToolConfig{},
+ }
+
+ factory := &FilterFactory{cfg: cfg}
+ if err := factory.Apply(); err != nil {
+ t.Fatalf("Failed to apply filter factory: %v", err)
+ }
+
+ sessionManager := transport.NewSessionManager()
+ sseHandler := transport.NewSSEHandler(sessionManager)
+ contentNegotiator := transport.NewContentNegotiator()
+
+ return &MCPServerFilter{
+ cfg: cfg,
+ registry: factory.registry,
+ errorHandler: NewErrorHandler(),
+ responseBuilder: NewResponseBuilder(),
+ sessionManager: sessionManager,
+ sseHandler: sseHandler,
+ contentNegotiator: contentNegotiator,
+ }
+}
+
+func createTestContext(req *http.Request, recorder *httptest.ResponseRecorder)
*contexthttp.HttpContext {
+ return &contexthttp.HttpContext{
+ Request: req,
+ Writer: recorder,
+ Ctx: context.Background(),
+ Params: make(map[string]any),
+ }
+}
diff --git a/pkg/filter/mcp/mcpserver/globals.go
b/pkg/filter/mcp/mcpserver/globals.go
new file mode 100644
index 00000000..4657d375
--- /dev/null
+++ b/pkg/filter/mcp/mcpserver/globals.go
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mcpserver
+
+import (
+ "sync"
+)
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/filter/mcp/mcpserver/transport"
+ "github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+// Global singletons for MCP server components
+var (
+ globalRegistry *ToolRegistry
+ globalDynamic *DynamicConsumer
+ globalSessionManager *transport.SessionManager
+
+ // sync.Once variables for thread-safe singleton initialization
+ registryOnce sync.Once
+ dynamicOnce sync.Once
+ sessionManagerOnce sync.Once
+)
+
+// GetOrInitRegistry returns the global tool registry singleton
+func GetOrInitRegistry() *ToolRegistry {
+ registryOnce.Do(func() {
+ globalRegistry = NewToolRegistry()
+ logger.Infof("[dubbo-go-pixiu] mcp server initialized global
tool registry")
+ })
+ return globalRegistry
+}
+
+// GetOrInitDynamicConsumer returns the global dynamic consumer singleton
+func GetOrInitDynamicConsumer() *DynamicConsumer {
+ dynamicOnce.Do(func() {
+ reg := GetOrInitRegistry()
+ sm := GetOrInitSessionManager()
+ sseHandler := transport.NewSSEHandler(sm)
+ globalDynamic = NewDynamicConsumer(reg, sm, sseHandler)
+ logger.Infof("[dubbo-go-pixiu] mcp server initialized global
dynamic consumer")
+ })
+ return globalDynamic
+}
+
+// GetOrInitSessionManager returns the global session manager singleton
+func GetOrInitSessionManager() *transport.SessionManager {
+ sessionManagerOnce.Do(func() {
+ globalSessionManager = transport.NewSessionManager()
+ logger.Infof("[dubbo-go-pixiu] mcp server initialized global
session manager")
+ })
+ return globalSessionManager
+}
+
+// ResetGlobalState resets all global singletons (for testing)
+func ResetGlobalState() {
+ globalRegistry = nil
+ globalDynamic = nil
+ if globalSessionManager != nil {
+ globalSessionManager.Stop()
+ }
+ globalSessionManager = nil
+
+ registryOnce = sync.Once{}
+ dynamicOnce = sync.Once{}
+ sessionManagerOnce = sync.Once{}
+
+ logger.Debugf("[dubbo-go-pixiu] mcp server global state reset")
+}
diff --git a/pkg/filter/mcp/mcpserver/handlers.go
b/pkg/filter/mcp/mcpserver/handlers.go
index 21f44e52..ae0a1f4c 100644
--- a/pkg/filter/mcp/mcpserver/handlers.go
+++ b/pkg/filter/mcp/mcpserver/handlers.go
@@ -33,6 +33,7 @@ import (
"github.com/apache/dubbo-go-pixiu/pkg/client"
"github.com/apache/dubbo-go-pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
+ "github.com/apache/dubbo-go-pixiu/pkg/filter/mcp/mcpserver/transport"
"github.com/apache/dubbo-go-pixiu/pkg/logger"
"github.com/apache/dubbo-go-pixiu/pkg/model"
)
@@ -50,13 +51,34 @@ const (
// handleInitialize handles the initialize method
func (f *MCPServerFilter) handleInitialize(ctx *MCPContext, req
mcp.JSONRPCRequest) filter.FilterStatus {
- // Build server capabilities using mcp-go structures
+ // Parse client's protocol version from request params
+ var initParams struct {
+ ProtocolVersion string `json:"protocolVersion"`
+ ClientInfo struct {
+ Name string `json:"name"`
+ Version string `json:"version"`
+ } `json:"clientInfo"`
+ }
+
+ if req.Params != nil {
+ if paramsBytes, err := json.Marshal(req.Params); err == nil {
+ json.Unmarshal(paramsBytes, &initParams)
+ }
+ }
+
+ clientVersion := initParams.ProtocolVersion
+ if clientVersion == "" {
+ clientVersion = ctx.ProtocolVersion()
+ }
+
+ logger.Infof("[dubbo-go-pixiu] mcp server initialize: client=%s
version=%s, server will respond with=%s",
+ initParams.ClientInfo.Name, clientVersion,
constant.MCPProtocolVersion20250618)
+
capabilities := mcp.ServerCapabilities{
Tools: &struct {
ListChanged bool `json:"listChanged,omitempty"`
}{
- // disable listChanged notifications (not implemented)
- ListChanged: false,
+ ListChanged: true,
},
Resources: &struct {
Subscribe bool `json:"subscribe,omitempty"`
@@ -72,26 +94,40 @@ func (f *MCPServerFilter) handleInitialize(ctx *MCPContext,
req mcp.JSONRPCReque
},
}
- // Build server info using mcp-go structures
serverInfo := mcp.Implementation{
Name: f.cfg.ServerInfo.Name,
Version: f.cfg.ServerInfo.Version,
}
- // Create initialization result using mcp-go API
instructions := f.cfg.ServerInfo.Instructions
if instructions == "" {
instructions = "This MCP server provides API access through
tools, documentation through resources, and AI assistance through prompts."
}
- result := mcp.NewInitializeResult(mcp.LATEST_PROTOCOL_VERSION,
capabilities, serverInfo, instructions)
+ result := mcp.NewInitializeResult(constant.MCPProtocolVersion20250618,
capabilities, serverInfo, instructions)
- // Create JSON-RPC response
response := f.responseBuilder.Success(req.ID, result)
+
+ // Per MCP spec: assign a session ID at initialization time for
Streamable HTTP transport
+ // This enables clients to use the session for SSE-based responses
+ sessionIDHeader := ctx.SessionID()
+ session, _ := f.sessionManager.EnsureSession(sessionIDHeader)
+
+ // Add Mcp-Session-Id header to the response
+ ctx.AddHeader(constant.HeaderKeyMCPSessionId, session.ID)
+
+ logger.Infof("[dubbo-go-pixiu] mcp server created session for client:
%s", session.ID)
+
return f.sendJSONResponse(ctx, response)
}
// handleToolsList handles the tools/list method using mcp-go APIs
-func (f *MCPServerFilter) handleToolsList(ctx *MCPContext, req
mcp.JSONRPCRequest) filter.FilterStatus {
+func (f *MCPServerFilter) handleToolsList(ctx *MCPContext, req
mcp.JSONRPCRequest, responseFormat transport.ResponseFormat)
filter.FilterStatus {
+ response := f.buildToolsListResponseObject(req)
+ return f.sendResponseWithFormat(ctx, response, responseFormat)
+}
+
+// buildToolsListResponseObject builds the tools/list response object (for SSE)
+func (f *MCPServerFilter) buildToolsListResponseObject(req mcp.JSONRPCRequest)
mcp.JSONRPCResponse {
// Read tools from registry to reflect dynamic updates
toolCfgs := f.registry.ListTools()
tools := make([]mcp.Tool, 0, len(toolCfgs))
@@ -122,10 +158,9 @@ func (f *MCPServerFilter) handleToolsList(ctx *MCPContext,
req mcp.JSONRPCReques
}
// Build standard MCP tools list response using mcp-go structures
- result := mcp.NewListToolsResult(tools, "") // empty cursor for no
pagination
+ result := mcp.NewListToolsResult(tools, "")
- response := f.responseBuilder.Success(req.ID, result)
- return f.sendJSONResponse(ctx, response)
+ return f.responseBuilder.Success(req.ID, result)
}
// buildToolParameterOptions builds the mcp.PropertyOption slice for a given
tool argument
@@ -166,46 +201,47 @@ func (f *MCPServerFilter) buildToolParameterOptions(arg
*model.ArgConfig) []mcp.
}
// handleResourcesList handles the resources/list method
-func (f *MCPServerFilter) handleResourcesList(ctx *MCPContext, req
mcp.JSONRPCRequest) filter.FilterStatus {
- // Get all resources
+func (f *MCPServerFilter) handleResourcesList(ctx *MCPContext, req
mcp.JSONRPCRequest, responseFormat transport.ResponseFormat)
filter.FilterStatus {
mcpResources, err := f.registry.ToMCPResources()
if err != nil {
logger.Errorf("[dubbo-go-pixiu] mcp server failed to get MCP
resources: %v", err)
return f.errorHandler.SendInternalError(ctx, req.ID, "failed to
get resources")
}
- // Build resources list response using mcp-go structures
- result := mcp.NewListResourcesResult(mcpResources, "") // empty cursor
for no pagination
+ response := f.buildResourcesListResponseObject(req, mcpResources)
+ return f.sendResponseWithFormat(ctx, response, responseFormat)
+}
- response := f.responseBuilder.Success(req.ID, result)
- return f.sendJSONResponse(ctx, response)
+// buildResourcesListResponseObject builds the resources/list response object
(for SSE)
+func (f *MCPServerFilter) buildResourcesListResponseObject(req
mcp.JSONRPCRequest, mcpResources []mcp.Resource) mcp.JSONRPCResponse {
+ // Build resources list response using mcp-go structures
+ result := mcp.NewListResourcesResult(mcpResources, "")
+ return f.responseBuilder.Success(req.ID, result)
}
// handlePing handles the ping method
-func (f *MCPServerFilter) handlePing(ctx *MCPContext, req mcp.JSONRPCRequest)
filter.FilterStatus {
- logger.Debugf("[dubbo-go-pixiu] mcp server handling ping request")
-
- // Simple ping response
- result := map[string]any{}
+func (f *MCPServerFilter) handlePing(ctx *MCPContext, req mcp.JSONRPCRequest,
responseFormat transport.ResponseFormat) filter.FilterStatus {
+ response := f.buildPingResponseObject(req)
+ return f.sendResponseWithFormat(ctx, response, responseFormat)
+}
- response := f.responseBuilder.Success(req.ID, result)
- return f.sendJSONResponse(ctx, response)
+// buildPingResponseObject builds the ping response object (for SSE)
+func (f *MCPServerFilter) buildPingResponseObject(req mcp.JSONRPCRequest)
mcp.JSONRPCResponse {
+ logger.Debugf("[dubbo-go-pixiu] mcp server handling ping request")
+ return f.responseBuilder.Success(req.ID, map[string]any{})
}
// handleNotificationsInitialized handles notifications/initialized
notification
-func (f *MCPServerFilter) handleNotificationsInitialized(_ *MCPContext, _
mcp.JSONRPCRequest) filter.FilterStatus {
+func (f *MCPServerFilter) handleNotificationsInitialized(ctx *MCPContext, _
mcp.JSONRPCRequest) filter.FilterStatus {
logger.Debugf("[dubbo-go-pixiu] mcp server received initialized
notification from client")
- // Store client initialization state
- // This notification indicates that the client has completed
initialization
- // and is ready to receive requests
-
- // For notifications, we don't send a response, just return Stop
+ // Per MCP spec, notifications MUST return 202 Accepted with no body
+ ctx.SendLocalReply(http.StatusAccepted, nil)
return filter.Stop
}
// handleResourceRead handles the resources/read method
-func (f *MCPServerFilter) handleResourceRead(ctx *MCPContext, req
mcp.JSONRPCRequest) filter.FilterStatus {
+func (f *MCPServerFilter) handleResourceRead(ctx *MCPContext, req
mcp.JSONRPCRequest, responseFormat transport.ResponseFormat)
filter.FilterStatus {
logger.Debugf("[dubbo-go-pixiu] mcp server handling resources/read")
// Parse request parameters
@@ -245,11 +281,11 @@ func (f *MCPServerFilter) handleResourceRead(ctx
*MCPContext, req mcp.JSONRPCReq
}
response := f.responseBuilder.Success(req.ID, result)
- return f.sendJSONResponse(ctx, response)
+ return f.sendResponseWithFormat(ctx, response, responseFormat)
}
// handleResourceTemplatesList handles the resources/templates/list method
-func (f *MCPServerFilter) handleResourceTemplatesList(ctx *MCPContext, req
mcp.JSONRPCRequest) filter.FilterStatus {
+func (f *MCPServerFilter) handleResourceTemplatesList(ctx *MCPContext, req
mcp.JSONRPCRequest, responseFormat transport.ResponseFormat)
filter.FilterStatus {
// Get all resource templates (parameterized resource patterns)
mcpResourceTemplates, err := f.registry.ToMCPResourceTemplates()
if err != nil {
@@ -263,11 +299,11 @@ func (f *MCPServerFilter) handleResourceTemplatesList(ctx
*MCPContext, req mcp.J
}
response := f.responseBuilder.Success(req.ID, result)
- return f.sendJSONResponse(ctx, response)
+ return f.sendResponseWithFormat(ctx, response, responseFormat)
}
// handlePromptsList handles the prompts/list method
-func (f *MCPServerFilter) handlePromptsList(ctx *MCPContext, req
mcp.JSONRPCRequest) filter.FilterStatus {
+func (f *MCPServerFilter) handlePromptsList(ctx *MCPContext, req
mcp.JSONRPCRequest, responseFormat transport.ResponseFormat)
filter.FilterStatus {
logger.Debugf("[dubbo-go-pixiu] mcp server handling prompts/list
request")
// Get all prompts
@@ -283,11 +319,11 @@ func (f *MCPServerFilter) handlePromptsList(ctx
*MCPContext, req mcp.JSONRPCRequ
}
response := f.responseBuilder.Success(req.ID, result)
- return f.sendJSONResponse(ctx, response)
+ return f.sendResponseWithFormat(ctx, response, responseFormat)
}
// handlePromptsGet handles the prompts/get method
-func (f *MCPServerFilter) handlePromptsGet(ctx *MCPContext, req
mcp.JSONRPCRequest) filter.FilterStatus {
+func (f *MCPServerFilter) handlePromptsGet(ctx *MCPContext, req
mcp.JSONRPCRequest, responseFormat transport.ResponseFormat)
filter.FilterStatus {
logger.Debugf("[dubbo-go-pixiu] mcp server handling prompts/get
request")
// Parse request parameters
@@ -327,7 +363,7 @@ func (f *MCPServerFilter) handlePromptsGet(ctx *MCPContext,
req mcp.JSONRPCReque
}
response := f.responseBuilder.Success(req.ID, result)
- return f.sendJSONResponse(ctx, response)
+ return f.sendResponseWithFormat(ctx, response, responseFormat)
}
// buildPromptMessages builds prompt messages with parameter replacement
support
@@ -548,6 +584,25 @@ func (f *MCPServerFilter) processToolCallResponse(ctx
*MCPContext, requestID any
// sendMCPResponse sends an MCP response and updates the target response
func (f *MCPServerFilter) sendMCPResponse(ctx *MCPContext, response
mcp.JSONRPCResponse) filter.FilterStatus {
+ // Check if we should send via SSE (when session exists with active SSE
stream)
+ sessionID := ctx.SessionID()
+ if sessionID != "" {
+ session, exists := f.sessionManager.Session(sessionID)
+ if exists && session.PipeWriter != nil {
+ // Send via SSE stream
+ if sseErr := f.sseHandler.SendSSEMessage(session,
response); sseErr == nil {
+ logger.Debugf("[dubbo-go-pixiu] mcp server sent
tool call response via SSE for session: %s", sessionID)
+ // Return 202 Accepted without body per MCP spec
+ ctx.SendLocalReply(http.StatusAccepted, nil)
+ return filter.Stop
+ } else {
+ // If SSE send failed, fall through to JSON
response
+ logger.Warnf("[dubbo-go-pixiu] mcp server SSE
send failed, falling back to JSON: %v", sseErr)
+ }
+ }
+ }
+
+ // Default: send as JSON response (no SSE stream available)
mcpResponseBody, err := json.Marshal(response)
if err != nil {
logger.Errorf("[dubbo-go-pixiu] mcp server failed to marshal
MCP response: %v", err)
@@ -559,9 +614,6 @@ func (f *MCPServerFilter) sendMCPResponse(ctx *MCPContext,
response mcp.JSONRPCR
ctx.StatusCode(http.StatusOK)
ctx.AddHeader(constant.HeaderKeyContextType,
constant.HeaderValueApplicationJson)
- // Critical: Clear Content-Length header to prevent mismatch errors
- ctx.ClearContentLengthHeader()
-
logger.Debugf("[dubbo-go-pixiu] mcp server successfully wrapped backend
response in MCP format")
return filter.Continue
}
diff --git a/pkg/filter/mcp/mcpserver/registry.go
b/pkg/filter/mcp/mcpserver/registry.go
index 7c4f8937..dae4120c 100644
--- a/pkg/filter/mcp/mcpserver/registry.go
+++ b/pkg/filter/mcp/mcpserver/registry.go
@@ -38,11 +38,6 @@ type ToolRegistry struct {
resources map[string]model.ResourceConfig // indexed by
URI
resourceTemplates map[string]model.ResourceTemplateConfig // indexed by
name
prompts map[string]model.PromptConfig
-
- // TODO: Dynamic update support - add when integrating with Nacos
- // changeListeners []ChangeListener // change listeners
- // nacosClient nacos.ConfigClient // Nacos config client
- // serviceDiscovery nacos.NamingClient // Nacos service
discovery client
}
// NewToolRegistry creates a new tool registry
@@ -364,5 +359,3 @@ func (r *ToolRegistry) ToMCPPrompts() ([]map[string]any,
error) {
return mcpPrompts, nil
}
-
-// TODO: Dynamic update functionality - implement when integrating with Nacos
diff --git a/pkg/filter/mcp/mcpserver/response.go
b/pkg/filter/mcp/mcpserver/response.go
index 7612bb2b..8e34543d 100644
--- a/pkg/filter/mcp/mcpserver/response.go
+++ b/pkg/filter/mcp/mcpserver/response.go
@@ -151,3 +151,30 @@ func (eh *ErrorHandler) sendResponse(ctx *MCPContext,
response any) filter.Filte
ctx.SendLocalReply(http.StatusOK, responseBody)
return filter.Stop
}
+
+// ServerNotification creates a JSON-RPC notification from server
+func (rb *ResponseBuilder) ServerNotification(method string, params
map[string]any) mcp.JSONRPCNotification {
+ notificationParams := mcp.NotificationParams{
+ AdditionalFields: params,
+ }
+
+ return mcp.JSONRPCNotification{
+ JSONRPC: mcp.JSONRPC_VERSION,
+ Notification: mcp.Notification{
+ Method: method,
+ Params: notificationParams,
+ },
+ }
+}
+
+// ServerRequest creates a JSON-RPC request from server
+func (rb *ResponseBuilder) ServerRequest(id any, method string, params any)
mcp.JSONRPCRequest {
+ return mcp.JSONRPCRequest{
+ JSONRPC: mcp.JSONRPC_VERSION,
+ ID: mcp.NewRequestId(id),
+ Params: params,
+ Request: mcp.Request{
+ Method: method,
+ },
+ }
+}
diff --git a/pkg/filter/mcp/mcpserver/transport/content_negotiator.go
b/pkg/filter/mcp/mcpserver/transport/content_negotiator.go
new file mode 100644
index 00000000..07d7f060
--- /dev/null
+++ b/pkg/filter/mcp/mcpserver/transport/content_negotiator.go
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package transport
+
+import (
+ "strings"
+)
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
+)
+
+// ResponseFormat represents the response format type
+type ResponseFormat int
+
+const (
+ ResponseFormatJSON ResponseFormat = iota // Traditional JSON
response
+ ResponseFormatSSE // Server-Sent Events
response
+ ResponseFormatAccepted // 202 Accepted (no
immediate response)
+)
+
+// ContentNegotiator handles HTTP content negotiation for MCP responses
+type ContentNegotiator struct{}
+
+// NewContentNegotiator creates a new content negotiator
+func NewContentNegotiator() *ContentNegotiator {
+ return &ContentNegotiator{}
+}
+
+// NegotiateResponse determines the appropriate response format based on
Accept header and session state
+func (cn *ContentNegotiator) NegotiateResponse(acceptHeader string, hasSession
bool) ResponseFormat {
+ _, supportsSSE := cn.parseAcceptHeader(acceptHeader)
+
+ // Per MCP spec: when a session exists and client accepts SSE, use SSE
for consistent streaming
+ // Otherwise, default to JSON (for backward compatibility and when no
session available)
+ if supportsSSE && hasSession {
+ return ResponseFormatSSE
+ }
+
+ return ResponseFormatJSON
+}
+
+// SupportsSSE checks if the Accept header includes text/event-stream
+func (cn *ContentNegotiator) SupportsSSE(acceptHeader string) bool {
+ _, supportsSSE := cn.parseAcceptHeader(acceptHeader)
+ return supportsSSE
+}
+
+// SupportsJSON checks if the Accept header includes application/json
+func (cn *ContentNegotiator) SupportsJSON(acceptHeader string) bool {
+ supportsJSON, _ := cn.parseAcceptHeader(acceptHeader)
+ return supportsJSON
+}
+
+// parseAcceptHeader parses the Accept header and returns support for JSON and
SSE
+func (cn *ContentNegotiator) parseAcceptHeader(acceptHeader string)
(supportsJSON, supportsSSE bool) {
+ if acceptHeader == "" {
+ // Default to JSON support for backward compatibility
+ return true, false
+ }
+
+ acceptHeader = strings.ToLower(acceptHeader)
+
+ // Check for JSON support
+ if strings.Contains(acceptHeader, constant.HeaderValueApplicationJson)
||
+ strings.Contains(acceptHeader,
constant.MediaTypeApplicationWild) ||
+ strings.Contains(acceptHeader, constant.MediaTypeWildcard) {
+ supportsJSON = true
+ }
+
+ // Check for SSE support
+ if strings.Contains(acceptHeader, constant.HeaderValueTextEventStream)
||
+ strings.Contains(acceptHeader, constant.MediaTypeTextWild) ||
+ strings.Contains(acceptHeader, constant.MediaTypeWildcard) {
+ supportsSSE = true
+ }
+
+ return supportsJSON, supportsSSE
+}
+
+// GetPreferredContentType returns the Content-Type header value for the given
format
+func (cn *ContentNegotiator) GetPreferredContentType(format ResponseFormat)
string {
+ switch format {
+ case ResponseFormatSSE:
+ return constant.HeaderValueTextEventStream
+ case ResponseFormatJSON, ResponseFormatAccepted:
+ return constant.HeaderValueApplicationJson
+ default:
+ return constant.HeaderValueApplicationJson
+ }
+}
+
+// ValidateAcceptHeaderForMethod validates that the Accept header is
appropriate for the HTTP method
+func (cn *ContentNegotiator) ValidateAcceptHeaderForMethod(method,
acceptHeader string) error {
+ switch strings.ToUpper(method) {
+ case constant.Get:
+ // GET requests for SSE must accept text/event-stream
+ if !cn.SupportsSSE(acceptHeader) {
+ return &AcceptHeaderError{
+ Method: method,
+ AcceptHeader: acceptHeader,
+ Required:
constant.HeaderValueTextEventStream,
+ }
+ }
+ case constant.Post:
+ // POST requests should accept both application/json and
text/event-stream
+ supportsJSON, supportsSSE := cn.parseAcceptHeader(acceptHeader)
+ if !supportsJSON && !supportsSSE {
+ return &AcceptHeaderError{
+ Method: method,
+ AcceptHeader: acceptHeader,
+ Required:
constant.HeaderValueApplicationJson + " or " +
constant.HeaderValueTextEventStream,
+ }
+ }
+ }
+ return nil
+}
+
+// AcceptHeaderError represents an Accept header validation error
+type AcceptHeaderError struct {
+ Method string
+ AcceptHeader string
+ Required string
+}
+
+func (e *AcceptHeaderError) Error() string {
+ if e.AcceptHeader == "" {
+ return "missing Accept header for " + e.Method + " request,
required: " + e.Required
+ }
+ return "invalid Accept header '" + e.AcceptHeader + "' for " + e.Method
+ " request, required: " + e.Required
+}
diff --git a/pkg/filter/mcp/mcpserver/transport/content_negotiator_test.go
b/pkg/filter/mcp/mcpserver/transport/content_negotiator_test.go
new file mode 100644
index 00000000..43d66014
--- /dev/null
+++ b/pkg/filter/mcp/mcpserver/transport/content_negotiator_test.go
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package transport
+
+import (
+ "testing"
+)
+
+import (
+ "github.com/mark3labs/mcp-go/mcp"
+)
+
+func TestNewContentNegotiator(t *testing.T) {
+ cn := NewContentNegotiator()
+ if cn == nil {
+ t.Fatal("NewContentNegotiator returned nil")
+ }
+}
+
+func TestParseAcceptHeader(t *testing.T) {
+ cn := NewContentNegotiator()
+
+ tests := []struct {
+ name string
+ acceptHeader string
+ wantJSON bool
+ wantSSE bool
+ }{
+ {
+ name: "empty header defaults to JSON",
+ acceptHeader: "",
+ wantJSON: true,
+ wantSSE: false,
+ },
+ {
+ name: "application/json only",
+ acceptHeader: "application/json",
+ wantJSON: true,
+ wantSSE: false,
+ },
+ {
+ name: "text/event-stream only",
+ acceptHeader: "text/event-stream",
+ wantJSON: false,
+ wantSSE: true,
+ },
+ {
+ name: "both formats",
+ acceptHeader: "application/json, text/event-stream",
+ wantJSON: true,
+ wantSSE: true,
+ },
+ {
+ name: "wildcard accepts all",
+ acceptHeader: "*/*",
+ wantJSON: true,
+ wantSSE: true,
+ },
+ {
+ name: "application wildcard",
+ acceptHeader: "application/*",
+ wantJSON: true,
+ wantSSE: false,
+ },
+ {
+ name: "text wildcard",
+ acceptHeader: "text/*",
+ wantJSON: false,
+ wantSSE: true,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ gotJSON, gotSSE := cn.parseAcceptHeader(tt.acceptHeader)
+ if gotJSON != tt.wantJSON {
+ t.Errorf("parseAcceptHeader() JSON = %v, want
%v", gotJSON, tt.wantJSON)
+ }
+ if gotSSE != tt.wantSSE {
+ t.Errorf("parseAcceptHeader() SSE = %v, want
%v", gotSSE, tt.wantSSE)
+ }
+ })
+ }
+}
+
+func TestSupportsSSE(t *testing.T) {
+ cn := NewContentNegotiator()
+
+ tests := []struct {
+ name string
+ acceptHeader string
+ want bool
+ }{
+ {"text/event-stream", "text/event-stream", true},
+ {"with charset", "text/event-stream;charset=utf-8", true},
+ {"wildcard", "*/*", true},
+ {"text wildcard", "text/*", true},
+ {"JSON only", "application/json", false},
+ {"empty", "", false},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := cn.SupportsSSE(tt.acceptHeader); got !=
tt.want {
+ t.Errorf("SupportsSSE() = %v, want %v", got,
tt.want)
+ }
+ })
+ }
+}
+
+func TestSupportsJSON(t *testing.T) {
+ cn := NewContentNegotiator()
+
+ tests := []struct {
+ name string
+ acceptHeader string
+ want bool
+ }{
+ {"application/json", "application/json", true},
+ {"with charset", "application/json;charset=utf-8", true},
+ {"wildcard", "*/*", true},
+ {"application wildcard", "application/*", true},
+ {"SSE only", "text/event-stream", false},
+ {"empty defaults to JSON", "", true},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := cn.SupportsJSON(tt.acceptHeader); got !=
tt.want {
+ t.Errorf("SupportsJSON() = %v, want %v", got,
tt.want)
+ }
+ })
+ }
+}
+
+func TestNegotiateResponse(t *testing.T) {
+ cn := NewContentNegotiator()
+
+ tests := []struct {
+ name string
+ acceptHeader string
+ method string
+ hasSession bool
+ want ResponseFormat
+ }{
+ {
+ name: "no accept header defaults to JSON",
+ acceptHeader: "",
+ method: "initialize",
+ hasSession: false,
+ want: ResponseFormatJSON,
+ },
+ {
+ name: "JSON only returns JSON",
+ acceptHeader: "application/json",
+ method: "initialize",
+ hasSession: true,
+ want: ResponseFormatJSON,
+ },
+ {
+ name: "SSE only with session",
+ acceptHeader: "text/event-stream",
+ method: "initialize",
+ hasSession: true,
+ want: ResponseFormatSSE, // Returns SSE if
session exists and only SSE accepted
+ },
+ {
+ name: "tool call with both and session prefers
SSE",
+ acceptHeader: "application/json, text/event-stream",
+ method: string(mcp.MethodToolsCall),
+ hasSession: true,
+ want: ResponseFormatSSE,
+ },
+ {
+ name: "tool call without session uses JSON",
+ acceptHeader: "application/json, text/event-stream",
+ method: string(mcp.MethodToolsCall),
+ hasSession: false,
+ want: ResponseFormatJSON,
+ },
+ {
+ name: "any method with both and session prefers
SSE",
+ acceptHeader: "application/json, text/event-stream",
+ method: string(mcp.MethodToolsList),
+ hasSession: true,
+ want: ResponseFormatSSE,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got := cn.NegotiateResponse(tt.acceptHeader,
tt.hasSession)
+ if got != tt.want {
+ t.Errorf("NegotiateResponse() = %v, want %v",
got, tt.want)
+ }
+ })
+ }
+}
+
+func TestGetPreferredContentType(t *testing.T) {
+ cn := NewContentNegotiator()
+
+ tests := []struct {
+ name string
+ format ResponseFormat
+ want string
+ }{
+ {
+ name: "JSON format",
+ format: ResponseFormatJSON,
+ want: "application/json",
+ },
+ {
+ name: "SSE format",
+ format: ResponseFormatSSE,
+ want: "text/event-stream",
+ },
+ {
+ name: "Accepted format",
+ format: ResponseFormatAccepted,
+ want: "application/json",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := cn.GetPreferredContentType(tt.format); got !=
tt.want {
+ t.Errorf("GetPreferredContentType() = %v, want
%v", got, tt.want)
+ }
+ })
+ }
+}
diff --git a/pkg/filter/mcp/mcpserver/transport/session_manager.go
b/pkg/filter/mcp/mcpserver/transport/session_manager.go
new file mode 100644
index 00000000..1eee222e
--- /dev/null
+++ b/pkg/filter/mcp/mcpserver/transport/session_manager.go
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package transport
+
+import (
+ "crypto/rand"
+ "encoding/hex"
+ "io"
+ "sync"
+ "time"
+)
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+const (
+ SessionTimeout = 30 * time.Minute
+ CleanupInterval = 5 * time.Minute
+ KeepaliveInterval = 30 * time.Second
+)
+
+// MCPSession represents an active MCP session
+type MCPSession struct {
+ ID string
+ CreatedAt time.Time
+ LastActivity time.Time
+ PipeWriter *io.PipeWriter // Pipe writer for sending SSE messages
+ Done chan struct{}
+}
+
+// SessionManager manages MCP sessions for SSE connections
+type SessionManager struct {
+ sessions map[string]*MCPSession
+ mu sync.RWMutex
+ stopCh chan struct{}
+ once sync.Once
+}
+
+// NewSessionManager creates a new session manager
+func NewSessionManager() *SessionManager {
+ sm := &SessionManager{
+ sessions: make(map[string]*MCPSession),
+ stopCh: make(chan struct{}),
+ }
+ go sm.startCleanupRoutine()
+ return sm
+}
+
+// EnsureSession gets existing session or creates new one
+func (sm *SessionManager) EnsureSession(sessionIDHeader string) (*MCPSession,
bool) {
+ sm.mu.Lock()
+ defer sm.mu.Unlock()
+
+ // Try to get existing session
+ if sessionIDHeader != "" {
+ if session, exists := sm.sessions[sessionIDHeader]; exists {
+ session.LastActivity = time.Now()
+ return session, false // existing session
+ }
+ }
+
+ // Create new session
+ sessionID := sm.generateSessionID()
+ session := &MCPSession{
+ ID: sessionID,
+ CreatedAt: time.Now(),
+ LastActivity: time.Now(),
+ Done: make(chan struct{}),
+ }
+
+ sm.sessions[sessionID] = session
+ logger.Infof("[dubbo-go-pixiu] mcp server created new session: %s",
sessionID)
+ return session, true // new session
+}
+
+// Session retrieves a session by ID
+func (sm *SessionManager) Session(sessionID string) (*MCPSession, bool) {
+ sm.mu.RLock()
+ defer sm.mu.RUnlock()
+
+ session, exists := sm.sessions[sessionID]
+ if exists {
+ session.LastActivity = time.Now()
+ }
+ return session, exists
+}
+
+// RemoveSession removes a session and cleans up resources
+func (sm *SessionManager) RemoveSession(sessionID string) {
+ sm.mu.Lock()
+ defer sm.mu.Unlock()
+
+ if session, exists := sm.sessions[sessionID]; exists {
+ // Close Done channel to signal goroutines
+ close(session.Done)
+
+ // Close PipeWriter to end the SSE stream
+ if session.PipeWriter != nil {
+ session.PipeWriter.Close()
+ }
+
+ delete(sm.sessions, sessionID)
+ logger.Infof("[dubbo-go-pixiu] mcp server removed session: %s",
sessionID)
+ }
+}
+
+// Stop stops the session manager
+func (sm *SessionManager) Stop() {
+ sm.once.Do(func() {
+ close(sm.stopCh)
+
+ sm.mu.Lock()
+ for sessionID, session := range sm.sessions {
+ close(session.Done)
+ delete(sm.sessions, sessionID)
+ }
+ sm.mu.Unlock()
+ })
+}
+
+// generateSessionID generates a unique session ID
+func (sm *SessionManager) generateSessionID() string {
+ bytes := make([]byte, 16)
+ if _, err := rand.Read(bytes); err != nil {
+ // Fallback to timestamp-based ID
+ return hex.EncodeToString([]byte(time.Now().String()))
+ }
+ return hex.EncodeToString(bytes)
+}
+
+// startCleanupRoutine starts the session cleanup routine
+func (sm *SessionManager) startCleanupRoutine() {
+ ticker := time.NewTicker(CleanupInterval)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ticker.C:
+ sm.cleanupExpiredSessions()
+ case <-sm.stopCh:
+ return
+ }
+ }
+}
+
+// cleanupExpiredSessions removes expired sessions
+func (sm *SessionManager) cleanupExpiredSessions() {
+ sm.mu.Lock()
+ defer sm.mu.Unlock()
+
+ now := time.Now()
+ var toRemove []string
+
+ for sessionID, session := range sm.sessions {
+ if now.Sub(session.LastActivity) > SessionTimeout {
+ toRemove = append(toRemove, sessionID)
+ }
+ }
+
+ for _, sessionID := range toRemove {
+ if session, exists := sm.sessions[sessionID]; exists {
+ close(session.Done)
+ delete(sm.sessions, sessionID)
+ logger.Infof("[dubbo-go-pixiu] mcp server cleaned up
expired session: %s", sessionID)
+ }
+ }
+
+ if len(toRemove) > 0 {
+ logger.Debugf("[dubbo-go-pixiu] mcp server cleaned up %d
expired sessions", len(toRemove))
+ }
+}
+
+// AllSessionIDs returns all active session IDs
+func (sm *SessionManager) AllSessionIDs() []string {
+ sm.mu.RLock()
+ defer sm.mu.RUnlock()
+
+ ids := make([]string, 0, len(sm.sessions))
+ for id := range sm.sessions {
+ ids = append(ids, id)
+ }
+ return ids
+}
+
+// ActiveSessionCount returns the number of active sessions
+func (sm *SessionManager) ActiveSessionCount() int {
+ sm.mu.RLock()
+ defer sm.mu.RUnlock()
+ return len(sm.sessions)
+}
diff --git a/pkg/filter/mcp/mcpserver/transport/session_manager_test.go
b/pkg/filter/mcp/mcpserver/transport/session_manager_test.go
new file mode 100644
index 00000000..93acc5c4
--- /dev/null
+++ b/pkg/filter/mcp/mcpserver/transport/session_manager_test.go
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package transport
+
+import (
+ "testing"
+ "time"
+)
+
+func TestNewSessionManager(t *testing.T) {
+ sm := NewSessionManager()
+ if sm == nil {
+ t.Fatal("NewSessionManager returned nil")
+ }
+ if sm.sessions == nil {
+ t.Error("sessions map not initialized")
+ }
+ if sm.stopCh == nil {
+ t.Error("stopCh not initialized")
+ }
+ sm.Stop()
+}
+
+func TestEnsureSession_CreateNew(t *testing.T) {
+ sm := NewSessionManager()
+ defer sm.Stop()
+
+ // Test creating new session with empty header
+ session, isNew := sm.EnsureSession("")
+ if !isNew {
+ t.Error("Expected new session to be created")
+ }
+ if session == nil {
+ t.Fatal("Session should not be nil")
+ }
+ if session.ID == "" {
+ t.Error("Session ID should not be empty")
+ }
+ if session.Done == nil {
+ t.Error("Session Done channel should be initialized")
+ }
+ if len(session.ID) != 32 {
+ t.Errorf("Expected session ID length 32, got %d",
len(session.ID))
+ }
+}
+
+func TestEnsureSession_ReuseExisting(t *testing.T) {
+ sm := NewSessionManager()
+ defer sm.Stop()
+
+ // Create first session
+ session1, isNew1 := sm.EnsureSession("")
+ if !isNew1 {
+ t.Error("Expected new session")
+ }
+
+ // Try to get same session by ID
+ session2, isNew2 := sm.EnsureSession(session1.ID)
+ if isNew2 {
+ t.Error("Expected existing session, not new")
+ }
+ if session1.ID != session2.ID {
+ t.Error("Should return same session")
+ }
+
+ // Verify last activity was updated
+ if session2.LastActivity.Before(session1.LastActivity) {
+ t.Error("LastActivity should be updated")
+ }
+}
+
+func TestSession(t *testing.T) {
+ sm := NewSessionManager()
+ defer sm.Stop()
+
+ // Create session
+ session1, _ := sm.EnsureSession("")
+ sessionID := session1.ID
+
+ // Retrieve session
+ session2, exists := sm.Session(sessionID)
+ if !exists {
+ t.Error("Session should exist")
+ }
+ if session2.ID != sessionID {
+ t.Error("Retrieved wrong session")
+ }
+
+ // Try non-existent session
+ _, exists = sm.Session("non-existent-id")
+ if exists {
+ t.Error("Non-existent session should not be found")
+ }
+}
+
+func TestRemoveSession(t *testing.T) {
+ sm := NewSessionManager()
+ defer sm.Stop()
+
+ // Create session
+ session, _ := sm.EnsureSession("")
+ sessionID := session.ID
+
+ // Verify session exists
+ _, exists := sm.Session(sessionID)
+ if !exists {
+ t.Fatal("Session should exist before removal")
+ }
+
+ // Remove session
+ sm.RemoveSession(sessionID)
+
+ // Verify session removed
+ _, exists = sm.Session(sessionID)
+ if exists {
+ t.Error("Session should be removed")
+ }
+
+ // Verify Done channel closed
+ select {
+ case <-session.Done:
+ // Expected: channel should be closed
+ default:
+ t.Error("Done channel should be closed")
+ }
+
+ // Test removing non-existent session (should not panic)
+ sm.RemoveSession("non-existent-id")
+}
+
+func TestSessionCleanup(t *testing.T) {
+ sm := NewSessionManager()
+ defer sm.Stop()
+
+ // Create session
+ session, _ := sm.EnsureSession("")
+ sessionID := session.ID
+
+ // Manually set old LastActivity to simulate timeout
+ sm.mu.Lock()
+ session.LastActivity = time.Now().Add(-SessionTimeout - 1*time.Minute)
+ sm.mu.Unlock()
+
+ // Trigger cleanup
+ sm.cleanupExpiredSessions()
+
+ // Verify session was cleaned up
+ _, exists := sm.Session(sessionID)
+ if exists {
+ t.Error("Expired session should be cleaned up")
+ }
+}
+
+func TestSessionManager_Stop(t *testing.T) {
+ sm := NewSessionManager()
+
+ // Create multiple sessions
+ session1, _ := sm.EnsureSession("")
+ session2, _ := sm.EnsureSession("")
+
+ // Stop manager
+ sm.Stop()
+
+ // Verify all sessions removed
+ _, exists1 := sm.Session(session1.ID)
+ _, exists2 := sm.Session(session2.ID)
+ if exists1 || exists2 {
+ t.Error("All sessions should be removed on Stop")
+ }
+
+ // Verify Done channels closed
+ select {
+ case <-session1.Done:
+ // Expected
+ default:
+ t.Error("Session1 Done should be closed")
+ }
+
+ select {
+ case <-session2.Done:
+ // Expected
+ default:
+ t.Error("Session2 Done should be closed")
+ }
+
+ // Verify stop channel closed
+ select {
+ case <-sm.stopCh:
+ // Expected
+ default:
+ t.Error("stopCh should be closed")
+ }
+}
+
+func TestGenerateSessionID(t *testing.T) {
+ sm := NewSessionManager()
+ defer sm.Stop()
+
+ // Generate multiple session IDs
+ ids := make(map[string]bool)
+ for i := 0; i < 100; i++ {
+ id := sm.generateSessionID()
+ if id == "" {
+ t.Error("Generated session ID should not be empty")
+ }
+ if len(id) != 32 {
+ t.Errorf("Expected session ID length 32, got %d",
len(id))
+ }
+ if ids[id] {
+ t.Errorf("Duplicate session ID generated: %s", id)
+ }
+ ids[id] = true
+ }
+}
+
+func TestConcurrentSessionAccess(t *testing.T) {
+ sm := NewSessionManager()
+ defer sm.Stop()
+
+ // Create initial session
+ session, _ := sm.EnsureSession("")
+ sessionID := session.ID
+
+ done := make(chan bool, 3)
+
+ // Concurrent reads
+ go func() {
+ for i := 0; i < 100; i++ {
+ sm.Session(sessionID)
+ time.Sleep(time.Millisecond)
+ }
+ done <- true
+ }()
+
+ // Concurrent writes (update LastActivity)
+ go func() {
+ for i := 0; i < 100; i++ {
+ sm.Session(sessionID)
+ time.Sleep(time.Millisecond)
+ }
+ done <- true
+ }()
+
+ // Concurrent session creation
+ go func() {
+ for i := 0; i < 10; i++ {
+ sm.EnsureSession("")
+ time.Sleep(10 * time.Millisecond)
+ }
+ done <- true
+ }()
+
+ // Wait for all goroutines
+ for i := 0; i < 3; i++ {
+ <-done
+ }
+}
diff --git a/pkg/filter/mcp/mcpserver/transport/sse_handler.go
b/pkg/filter/mcp/mcpserver/transport/sse_handler.go
new file mode 100644
index 00000000..166a9f95
--- /dev/null
+++ b/pkg/filter/mcp/mcpserver/transport/sse_handler.go
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package transport
+
+import (
+ "encoding/json"
+ "fmt"
+ "time"
+)
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+// SSEHandler provides utility functions for SSE message formatting
+type SSEHandler struct {
+ sessionManager *SessionManager
+}
+
+// NewSSEHandler creates a new SSE handler
+func NewSSEHandler(sessionManager *SessionManager) *SSEHandler {
+ return &SSEHandler{
+ sessionManager: sessionManager,
+ }
+}
+
+// SendSSEMessage sends a message through the SSE pipe
+func (h *SSEHandler) SendSSEMessage(session *MCPSession, message any) error {
+ if session.PipeWriter == nil {
+ return fmt.Errorf("SSE pipe not established")
+ }
+
+ // Marshal message to JSON
+ messageJSON, err := json.Marshal(message)
+ if err != nil {
+ return fmt.Errorf("failed to marshal SSE message: %w", err)
+ }
+
+ // Format as SSE event
+ sseData := h.FormatSSEMessage(string(messageJSON))
+
+ // Write to pipe
+ if _, err := session.PipeWriter.Write([]byte(sseData)); err != nil {
+ logger.Errorf("[dubbo-go-pixiu] mcp server failed to send SSE
message: %v", err)
+ return fmt.Errorf("failed to write to SSE pipe: %w", err)
+ }
+
+ session.LastActivity = time.Now()
+ logger.Debugf("[dubbo-go-pixiu] mcp server sent SSE message to session:
%s", session.ID)
+ return nil
+}
+
+// FormatSSEMessage formats a message as SSE data
+func (h *SSEHandler) FormatSSEMessage(messageJSON string) string {
+ return fmt.Sprintf("data: %s\n\n", messageJSON)
+}
+
+// FormatSSEEvent formats a custom SSE event with event type
+func (h *SSEHandler) FormatSSEEvent(eventType, data string) string {
+ if eventType != "" {
+ return fmt.Sprintf("event: %s\ndata: %s\n\n", eventType, data)
+ }
+ return fmt.Sprintf("data: %s\n\n", data)
+}
+
+// FormatSSEEventWithID formats a SSE event with ID for resumability
+func (h *SSEHandler) FormatSSEEventWithID(eventID int64, data string) string {
+ return fmt.Sprintf("id: %d\ndata: %s\n\n", eventID, data)
+}
+
+// FormatSSEKeepalive formats a SSE keepalive comment
+func (h *SSEHandler) FormatSSEKeepalive(timestamp int64) string {
+ return fmt.Sprintf(": keepalive %d\n\n", timestamp)
+}
diff --git a/pkg/filter/mcp/mcpserver/transport/sse_handler_test.go
b/pkg/filter/mcp/mcpserver/transport/sse_handler_test.go
new file mode 100644
index 00000000..b1246d8f
--- /dev/null
+++ b/pkg/filter/mcp/mcpserver/transport/sse_handler_test.go
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package transport
+
+import (
+ "io"
+ "testing"
+ "time"
+)
+
+func TestNewSSEHandler(t *testing.T) {
+ sm := NewSessionManager()
+ defer sm.Stop()
+
+ handler := NewSSEHandler(sm)
+ if handler == nil {
+ t.Fatal("NewSSEHandler returned nil")
+ }
+ if handler.sessionManager == nil {
+ t.Error("sessionManager not initialized")
+ }
+}
+
+func TestFormatSSEMessage(t *testing.T) {
+ sm := NewSessionManager()
+ defer sm.Stop()
+ handler := NewSSEHandler(sm)
+
+ tests := []struct {
+ name string
+ input string
+ expected string
+ }{
+ {
+ name: "simple message",
+ input: `{"jsonrpc":"2.0","method":"test"}`,
+ expected: "data:
{\"jsonrpc\":\"2.0\",\"method\":\"test\"}\n\n",
+ },
+ {
+ name: "empty message",
+ input: "",
+ expected: "data: \n\n",
+ },
+ {
+ name: "message with newlines",
+ input: "line1\nline2",
+ expected: "data: line1\nline2\n\n",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ result := handler.FormatSSEMessage(tt.input)
+ if result != tt.expected {
+ t.Errorf("FormatSSEMessage() = %q, want %q",
result, tt.expected)
+ }
+ })
+ }
+}
+
+func TestFormatSSEEvent(t *testing.T) {
+ sm := NewSessionManager()
+ defer sm.Stop()
+ handler := NewSSEHandler(sm)
+
+ tests := []struct {
+ name string
+ eventType string
+ data string
+ expected string
+ }{
+ {
+ name: "with event type",
+ eventType: "message",
+ data: "test data",
+ expected: "event: message\ndata: test data\n\n",
+ },
+ {
+ name: "without event type",
+ eventType: "",
+ data: "test data",
+ expected: "data: test data\n\n",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ result := handler.FormatSSEEvent(tt.eventType, tt.data)
+ if result != tt.expected {
+ t.Errorf("FormatSSEEvent() = %q, want %q",
result, tt.expected)
+ }
+ })
+ }
+}
+
+func TestFormatSSEEventWithID(t *testing.T) {
+ sm := NewSessionManager()
+ defer sm.Stop()
+ handler := NewSSEHandler(sm)
+
+ result := handler.FormatSSEEventWithID(123, "test data")
+ expected := "id: 123\ndata: test data\n\n"
+ if result != expected {
+ t.Errorf("FormatSSEEventWithID() = %q, want %q", result,
expected)
+ }
+}
+
+func TestFormatSSEKeepalive(t *testing.T) {
+ sm := NewSessionManager()
+ defer sm.Stop()
+ handler := NewSSEHandler(sm)
+
+ timestamp := time.Now().Unix()
+ result := handler.FormatSSEKeepalive(timestamp)
+
+ // Just check format is correct (contains keepalive and timestamp)
+ if len(result) < 10 {
+ t.Error("Keepalive format too short")
+ }
+ if result[0] != ':' {
+ t.Error("Keepalive should start with ':'")
+ }
+ if result[len(result)-2:] != "\n\n" {
+ t.Error("Keepalive should end with \\n\\n")
+ }
+}
+
+func TestSendSSEMessage(t *testing.T) {
+ sm := NewSessionManager()
+ defer sm.Stop()
+ handler := NewSSEHandler(sm)
+
+ // Create session with pipe
+ session, _ := sm.EnsureSession("")
+ pipeReader, pipeWriter := io.Pipe()
+ session.PipeWriter = pipeWriter
+
+ // Send message in goroutine
+ message := map[string]any{
+ "jsonrpc": "2.0",
+ "method": "test",
+ }
+
+ errCh := make(chan error, 1)
+ go func() {
+ errCh <- handler.SendSSEMessage(session, message)
+ }()
+
+ // Read from pipe
+ buf := make([]byte, 1024)
+ n, err := pipeReader.Read(buf)
+ if err != nil {
+ t.Fatalf("Failed to read from pipe: %v", err)
+ }
+
+ received := string(buf[:n])
+ if len(received) == 0 {
+ t.Error("Should receive data")
+ }
+ if received[:5] != "data:" {
+ t.Error("Should start with 'data:'")
+ }
+ if received[len(received)-2:] != "\n\n" {
+ t.Error("Should end with \\n\\n")
+ }
+
+ // Check send error
+ if err := <-errCh; err != nil {
+ t.Errorf("SendSSEMessage failed: %v", err)
+ }
+
+ pipeWriter.Close()
+ pipeReader.Close()
+}
+
+func TestSendSSEMessage_NoPipe(t *testing.T) {
+ sm := NewSessionManager()
+ defer sm.Stop()
+ handler := NewSSEHandler(sm)
+
+ // Create session without pipe
+ session, _ := sm.EnsureSession("")
+
+ message := map[string]any{"test": "data"}
+ err := handler.SendSSEMessage(session, message)
+ if err == nil {
+ t.Error("Expected error when PipeWriter is nil")
+ }
+ if err.Error() != "SSE pipe not established" {
+ t.Errorf("Unexpected error message: %v", err)
+ }
+}
+
+func TestSendSSEMessage_InvalidJSON(t *testing.T) {
+ sm := NewSessionManager()
+ defer sm.Stop()
+ handler := NewSSEHandler(sm)
+
+ session, _ := sm.EnsureSession("")
+ pipeReader, pipeWriter := io.Pipe()
+ session.PipeWriter = pipeWriter
+ defer pipeReader.Close()
+ defer pipeWriter.Close()
+
+ // Try to send invalid message (channel cannot be marshaled)
+ invalidMessage := make(chan int)
+ err := handler.SendSSEMessage(session, invalidMessage)
+ if err == nil {
+ t.Error("Expected error when marshaling invalid message")
+ }
+}