This is an automated email from the ASF dual-hosted git repository.
zrlw pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 03c5e6e784 Fix/mcp sse content type and endpoint (#15763)
03c5e6e784 is described below
commit 03c5e6e7842358499eb1d2608baf34484dd6c574
Author: Rain Yu <[email protected]>
AuthorDate: Sat Nov 8 07:09:31 2025 +0800
Fix/mcp sse content type and endpoint (#15763)
* fix(http12,mcp): set SSE content-type and honor configured MCP message
path
Ensure text/event-stream for SSE and use
dubbo.protocol.triple.rest.mcp.path.message in endpoint event.
* unitTest(http12):assert SSE[ServerSentEventEncoder] content-type and
encoding format
* ran spotless apply command
* fix ci
---------
Co-authored-by: Akshat Sinha <[email protected]>
Co-authored-by: heliang <[email protected]>
---
.../transport/DubboMcpSseTransportProvider.java | 8 ++-
.../DubboMcpSseTransportProviderTest.java | 7 +-
.../http12/message/ServerSentEventEncoder.java | 5 +-
.../http12/message/ServerSentEventEncoderTest.java | 82 ++++++++++++++++++++++
4 files changed, 98 insertions(+), 4 deletions(-)
diff --git
a/dubbo-plugin/dubbo-mcp/src/main/java/org/apache/dubbo/mcp/transport/DubboMcpSseTransportProvider.java
b/dubbo-plugin/dubbo-mcp/src/main/java/org/apache/dubbo/mcp/transport/DubboMcpSseTransportProvider.java
index 27ababf547..2fce772579 100644
---
a/dubbo-plugin/dubbo-mcp/src/main/java/org/apache/dubbo/mcp/transport/DubboMcpSseTransportProvider.java
+++
b/dubbo-plugin/dubbo-mcp/src/main/java/org/apache/dubbo/mcp/transport/DubboMcpSseTransportProvider.java
@@ -17,11 +17,14 @@
package org.apache.dubbo.mcp.transport;
import org.apache.dubbo.cache.support.expiring.ExpiringMap;
+import org.apache.dubbo.common.config.Configuration;
+import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.common.utils.IOUtils;
import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.mcp.McpConstant;
import org.apache.dubbo.remoting.http12.HttpMethods;
import org.apache.dubbo.remoting.http12.HttpRequest;
import org.apache.dubbo.remoting.http12.HttpResponse;
@@ -29,6 +32,7 @@ import org.apache.dubbo.remoting.http12.HttpResult;
import org.apache.dubbo.remoting.http12.HttpStatus;
import org.apache.dubbo.remoting.http12.message.ServerSentEvent;
import org.apache.dubbo.rpc.RpcContext;
+import org.apache.dubbo.rpc.model.ApplicationModel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -159,7 +163,9 @@ public class DubboMcpSseTransportProvider implements
McpServerTransportProvider
new DubboMcpSessionTransport(responseObserver, objectMapper);
McpServerSession mcpServerSession =
sessionFactory.create(dubboMcpSessionTransport);
sessions.put(mcpServerSession.getId(), mcpServerSession);
- sendEvent(responseObserver, ENDPOINT_EVENT_TYPE, "/mcp/message" +
"?sessionId=" + mcpServerSession.getId());
+ Configuration conf =
ConfigurationUtils.getGlobalConfiguration(ApplicationModel.defaultModel());
+ String messagePath =
conf.getString(McpConstant.SETTINGS_MCP_PATHS_MESSAGE, "/mcp/message");
+ sendEvent(responseObserver, ENDPOINT_EVENT_TYPE, messagePath +
"?sessionId=" + mcpServerSession.getId());
}
private void refreshSessionExpire(McpServerSession session) {
diff --git
a/dubbo-plugin/dubbo-mcp/src/test/java/org/apache/dubbo/mcp/transport/DubboMcpSseTransportProviderTest.java
b/dubbo-plugin/dubbo-mcp/src/test/java/org/apache/dubbo/mcp/transport/DubboMcpSseTransportProviderTest.java
index ee8b7cf0ce..bd92f8f870 100644
---
a/dubbo-plugin/dubbo-mcp/src/test/java/org/apache/dubbo/mcp/transport/DubboMcpSseTransportProviderTest.java
+++
b/dubbo-plugin/dubbo-mcp/src/test/java/org/apache/dubbo/mcp/transport/DubboMcpSseTransportProviderTest.java
@@ -36,6 +36,7 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockedStatic;
@@ -99,7 +100,11 @@ class DubboMcpSseTransportProviderTest {
transportProvider.handleRequest(responseObserver);
verify(httpRequest, times(1)).method();
- verify(responseObserver, times(1)).onNext(any(ServerSentEvent.class));
+ ArgumentCaptor<ServerSentEvent> captor =
ArgumentCaptor.forClass(ServerSentEvent.class);
+ verify(responseObserver, times(1)).onNext(captor.capture());
+ ServerSentEvent evt = captor.getValue();
+ Assertions.assertEquals("endpoint", evt.getEvent());
+ Assertions.assertTrue(((String)
evt.getData()).startsWith("/mcp/message?sessionId="));
}
@Test
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/ServerSentEventEncoder.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/ServerSentEventEncoder.java
index 7338271df6..deaba392b8 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/ServerSentEventEncoder.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/ServerSentEventEncoder.java
@@ -93,12 +93,13 @@ public final class ServerSentEventEncoder implements
HttpMessageEncoder {
@Override
public String contentType() {
- return httpMessageEncoder.contentType();
+ // An idea:sse use text/event-stream regardless of the underlying data
encoder...
+ return MediaType.TEXT_EVENT_STREAM.getName();
}
@Override
public MediaType mediaType() {
- return httpMessageEncoder.mediaType();
+ return MediaType.TEXT_EVENT_STREAM;
}
@Override
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/test/java/org/apache/dubbo/remoting/http12/message/ServerSentEventEncoderTest.java
b/dubbo-remoting/dubbo-remoting-http12/src/test/java/org/apache/dubbo/remoting/http12/message/ServerSentEventEncoderTest.java
new file mode 100644
index 0000000000..3cf05b069e
--- /dev/null
+++
b/dubbo-remoting/dubbo-remoting-http12/src/test/java/org/apache/dubbo/remoting/http12/message/ServerSentEventEncoderTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.http12.message;
+
+import org.apache.dubbo.remoting.http12.exception.EncodeException;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class ServerSentEventEncoderTest {
+
+ static class DummyJsonEncoder implements HttpMessageEncoder {
+ @Override
+ public void encode(OutputStream outputStream, Object data,
java.nio.charset.Charset charset)
+ throws EncodeException {
+ try {
+ if (data instanceof byte[]) {
+ outputStream.write((byte[]) data);
+ } else {
+ outputStream.write(String.valueOf(data).getBytes(charset));
+ }
+ } catch (Exception e) {
+ throw new EncodeException("encode error", e);
+ }
+ }
+
+ @Override
+ public MediaType mediaType() {
+ return MediaType.APPLICATION_JSON;
+ }
+
+ @Override
+ public boolean supports(String mediaType) {
+ return true;
+ }
+ }
+
+ @Test
+ void shouldUseTextEventStreamContentType() {
+ ServerSentEventEncoder sse = new ServerSentEventEncoder(new
DummyJsonEncoder());
+ Assertions.assertEquals(MediaType.TEXT_EVENT_STREAM, sse.mediaType());
+ Assertions.assertEquals(MediaType.TEXT_EVENT_STREAM.getName(),
sse.contentType());
+ }
+
+ @Test
+ void shouldEncodeServerSentEventFormat() {
+ ServerSentEventEncoder sse = new ServerSentEventEncoder(new
DummyJsonEncoder());
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ sse.encode(
+ bos,
+ ServerSentEvent.builder()
+ .event("message")
+ .data("{\"a\":1}")
+ .id("1")
+ .build(),
+ StandardCharsets.UTF_8);
+ byte[] bytes = bos.toByteArray();
+ String text = new String(bytes, StandardCharsets.UTF_8);
+ Assertions.assertTrue(text.contains("id:1\n"));
+ Assertions.assertTrue(text.contains("event:message\n"));
+ Assertions.assertTrue(text.contains("data:{\"a\":1}\n"));
+ Assertions.assertTrue(text.endsWith("\n"));
+ }
+}