This is an automated email from the ASF dual-hosted git repository.

zrlw pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 03c5e6e784 Fix/mcp sse content type and endpoint (#15763)
03c5e6e784 is described below

commit 03c5e6e7842358499eb1d2608baf34484dd6c574
Author: Rain Yu <[email protected]>
AuthorDate: Sat Nov 8 07:09:31 2025 +0800

    Fix/mcp sse content type and endpoint (#15763)
    
    * fix(http12,mcp): set SSE content-type and honor configured MCP message 
path
    
    Ensure text/event-stream for SSE and use 
dubbo.protocol.triple.rest.mcp.path.message in endpoint event.
    
    * unitTest(http12):assert SSE[ServerSentEventEncoder] content-type and 
encoding format
    
    * ran spotless apply command
    
    * fix ci
    
    ---------
    
    Co-authored-by: Akshat Sinha <[email protected]>
    Co-authored-by: heliang <[email protected]>
---
 .../transport/DubboMcpSseTransportProvider.java    |  8 ++-
 .../DubboMcpSseTransportProviderTest.java          |  7 +-
 .../http12/message/ServerSentEventEncoder.java     |  5 +-
 .../http12/message/ServerSentEventEncoderTest.java | 82 ++++++++++++++++++++++
 4 files changed, 98 insertions(+), 4 deletions(-)

diff --git 
a/dubbo-plugin/dubbo-mcp/src/main/java/org/apache/dubbo/mcp/transport/DubboMcpSseTransportProvider.java
 
b/dubbo-plugin/dubbo-mcp/src/main/java/org/apache/dubbo/mcp/transport/DubboMcpSseTransportProvider.java
index 27ababf547..2fce772579 100644
--- 
a/dubbo-plugin/dubbo-mcp/src/main/java/org/apache/dubbo/mcp/transport/DubboMcpSseTransportProvider.java
+++ 
b/dubbo-plugin/dubbo-mcp/src/main/java/org/apache/dubbo/mcp/transport/DubboMcpSseTransportProvider.java
@@ -17,11 +17,14 @@
 package org.apache.dubbo.mcp.transport;
 
 import org.apache.dubbo.cache.support.expiring.ExpiringMap;
+import org.apache.dubbo.common.config.Configuration;
+import org.apache.dubbo.common.config.ConfigurationUtils;
 import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.common.utils.IOUtils;
 import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.mcp.McpConstant;
 import org.apache.dubbo.remoting.http12.HttpMethods;
 import org.apache.dubbo.remoting.http12.HttpRequest;
 import org.apache.dubbo.remoting.http12.HttpResponse;
@@ -29,6 +32,7 @@ import org.apache.dubbo.remoting.http12.HttpResult;
 import org.apache.dubbo.remoting.http12.HttpStatus;
 import org.apache.dubbo.remoting.http12.message.ServerSentEvent;
 import org.apache.dubbo.rpc.RpcContext;
+import org.apache.dubbo.rpc.model.ApplicationModel;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -159,7 +163,9 @@ public class DubboMcpSseTransportProvider implements 
McpServerTransportProvider
                 new DubboMcpSessionTransport(responseObserver, objectMapper);
         McpServerSession mcpServerSession = 
sessionFactory.create(dubboMcpSessionTransport);
         sessions.put(mcpServerSession.getId(), mcpServerSession);
-        sendEvent(responseObserver, ENDPOINT_EVENT_TYPE, "/mcp/message" + 
"?sessionId=" + mcpServerSession.getId());
+        Configuration conf = 
ConfigurationUtils.getGlobalConfiguration(ApplicationModel.defaultModel());
+        String messagePath = 
conf.getString(McpConstant.SETTINGS_MCP_PATHS_MESSAGE, "/mcp/message");
+        sendEvent(responseObserver, ENDPOINT_EVENT_TYPE, messagePath + 
"?sessionId=" + mcpServerSession.getId());
     }
 
     private void refreshSessionExpire(McpServerSession session) {
diff --git 
a/dubbo-plugin/dubbo-mcp/src/test/java/org/apache/dubbo/mcp/transport/DubboMcpSseTransportProviderTest.java
 
b/dubbo-plugin/dubbo-mcp/src/test/java/org/apache/dubbo/mcp/transport/DubboMcpSseTransportProviderTest.java
index ee8b7cf0ce..bd92f8f870 100644
--- 
a/dubbo-plugin/dubbo-mcp/src/test/java/org/apache/dubbo/mcp/transport/DubboMcpSseTransportProviderTest.java
+++ 
b/dubbo-plugin/dubbo-mcp/src/test/java/org/apache/dubbo/mcp/transport/DubboMcpSseTransportProviderTest.java
@@ -36,6 +36,7 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.MockedStatic;
@@ -99,7 +100,11 @@ class DubboMcpSseTransportProviderTest {
         transportProvider.handleRequest(responseObserver);
 
         verify(httpRequest, times(1)).method();
-        verify(responseObserver, times(1)).onNext(any(ServerSentEvent.class));
+        ArgumentCaptor<ServerSentEvent> captor = 
ArgumentCaptor.forClass(ServerSentEvent.class);
+        verify(responseObserver, times(1)).onNext(captor.capture());
+        ServerSentEvent evt = captor.getValue();
+        Assertions.assertEquals("endpoint", evt.getEvent());
+        Assertions.assertTrue(((String) 
evt.getData()).startsWith("/mcp/message?sessionId="));
     }
 
     @Test
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/ServerSentEventEncoder.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/ServerSentEventEncoder.java
index 7338271df6..deaba392b8 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/ServerSentEventEncoder.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/ServerSentEventEncoder.java
@@ -93,12 +93,13 @@ public final class ServerSentEventEncoder implements 
HttpMessageEncoder {
 
     @Override
     public String contentType() {
-        return httpMessageEncoder.contentType();
+        // An idea:sse use text/event-stream regardless of the underlying data 
encoder...
+        return MediaType.TEXT_EVENT_STREAM.getName();
     }
 
     @Override
     public MediaType mediaType() {
-        return httpMessageEncoder.mediaType();
+        return MediaType.TEXT_EVENT_STREAM;
     }
 
     @Override
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/test/java/org/apache/dubbo/remoting/http12/message/ServerSentEventEncoderTest.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/test/java/org/apache/dubbo/remoting/http12/message/ServerSentEventEncoderTest.java
new file mode 100644
index 0000000000..3cf05b069e
--- /dev/null
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/test/java/org/apache/dubbo/remoting/http12/message/ServerSentEventEncoderTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.http12.message;
+
+import org.apache.dubbo.remoting.http12.exception.EncodeException;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class ServerSentEventEncoderTest {
+
+    static class DummyJsonEncoder implements HttpMessageEncoder {
+        @Override
+        public void encode(OutputStream outputStream, Object data, 
java.nio.charset.Charset charset)
+                throws EncodeException {
+            try {
+                if (data instanceof byte[]) {
+                    outputStream.write((byte[]) data);
+                } else {
+                    outputStream.write(String.valueOf(data).getBytes(charset));
+                }
+            } catch (Exception e) {
+                throw new EncodeException("encode error", e);
+            }
+        }
+
+        @Override
+        public MediaType mediaType() {
+            return MediaType.APPLICATION_JSON;
+        }
+
+        @Override
+        public boolean supports(String mediaType) {
+            return true;
+        }
+    }
+
+    @Test
+    void shouldUseTextEventStreamContentType() {
+        ServerSentEventEncoder sse = new ServerSentEventEncoder(new 
DummyJsonEncoder());
+        Assertions.assertEquals(MediaType.TEXT_EVENT_STREAM, sse.mediaType());
+        Assertions.assertEquals(MediaType.TEXT_EVENT_STREAM.getName(), 
sse.contentType());
+    }
+
+    @Test
+    void shouldEncodeServerSentEventFormat() {
+        ServerSentEventEncoder sse = new ServerSentEventEncoder(new 
DummyJsonEncoder());
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        sse.encode(
+                bos,
+                ServerSentEvent.builder()
+                        .event("message")
+                        .data("{\"a\":1}")
+                        .id("1")
+                        .build(),
+                StandardCharsets.UTF_8);
+        byte[] bytes = bos.toByteArray();
+        String text = new String(bytes, StandardCharsets.UTF_8);
+        Assertions.assertTrue(text.contains("id:1\n"));
+        Assertions.assertTrue(text.contains("event:message\n"));
+        Assertions.assertTrue(text.contains("data:{\"a\":1}\n"));
+        Assertions.assertTrue(text.endsWith("\n"));
+    }
+}

Reply via email to