This is an automated email from the ASF dual-hosted git repository.

zrlw pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 3e8d1f5387 Support returning JSON content when using SSE (#15464)
3e8d1f5387 is described below

commit 3e8d1f5387bfe23d52dae96679c9f199e11289ab
Author: Sean Yang <[email protected]>
AuthorDate: Mon Jun 23 09:39:11 2025 +0800

    Support returning JSON content when using SSE (#15464)
    
    * Support returning JSON content when using SSE
    
    * Update Http2SseServerChannelObserver.java
    
    * Update Http2SseServerChannelObserver.java
    
    ---------
    
    Co-authored-by: zrlw <[email protected]>
---
 .../http12/AbstractServerHttpChannelObserver.java  |  4 ++--
 .../tri/h12/ServerStreamServerCallListener.java    |  8 ++++++-
 .../h12/http1/Http1SseServerChannelObserver.java   | 26 ++++++++++++++++++++++
 .../h12/http2/Http2SseServerChannelObserver.java   | 26 ++++++++++++++++++++++
 4 files changed, 61 insertions(+), 3 deletions(-)

diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
index 9d7f080223..c2473c85ab 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
@@ -166,13 +166,13 @@ public abstract class AbstractServerHttpChannelObserver<H 
extends HttpChannel> i
         if (message != null) {
             headers.set(HttpHeaderNames.CONTENT_TYPE.getKey(), 
responseEncoder.contentType());
         }
+        customizeHeaders(headers, throwable, message);
         if (data instanceof HttpResult) {
             HttpResult<?> result = (HttpResult<?>) data;
             if (result.getHeaders() != null) {
                 headers.set(result.getHeaders());
             }
         }
-        customizeHeaders(headers, throwable, message);
         return metadata;
     }
 
@@ -198,7 +198,7 @@ public abstract class AbstractServerHttpChannelObserver<H 
extends HttpChannel> i
         }
     }
 
-    protected final HttpOutputMessage buildMessage(int statusCode, Object 
data) throws Throwable {
+    protected HttpOutputMessage buildMessage(int statusCode, Object data) 
throws Throwable {
         if (statusCode < 200 || statusCode == 204 || statusCode == 304) {
             return null;
         }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/ServerStreamServerCallListener.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/ServerStreamServerCallListener.java
index 8c6a83f37a..8315ff7aba 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/ServerStreamServerCallListener.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/ServerStreamServerCallListener.java
@@ -17,6 +17,7 @@
 package org.apache.dubbo.rpc.protocol.tri.h12;
 
 import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.remoting.http12.HttpResult;
 import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.RpcInvocation;
@@ -29,7 +30,12 @@ public class ServerStreamServerCallListener extends 
AbstractServerCallListener {
     }
 
     @Override
-    public void onReturn(Object value) {}
+    public void onReturn(Object value) {
+        if (value instanceof HttpResult) {
+            responseObserver.onNext(value);
+            responseObserver.onCompleted();
+        }
+    }
 
     @Override
     public void onMessage(Object message) {
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/Http1SseServerChannelObserver.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/Http1SseServerChannelObserver.java
index 9b62aa6762..7499a16c95 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/Http1SseServerChannelObserver.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/Http1SseServerChannelObserver.java
@@ -20,12 +20,16 @@ import org.apache.dubbo.remoting.http12.HttpChannel;
 import org.apache.dubbo.remoting.http12.HttpConstants;
 import org.apache.dubbo.remoting.http12.HttpHeaderNames;
 import org.apache.dubbo.remoting.http12.HttpMetadata;
+import org.apache.dubbo.remoting.http12.HttpOutputMessage;
+import org.apache.dubbo.remoting.http12.HttpResult;
 import org.apache.dubbo.remoting.http12.h1.Http1ServerChannelObserver;
 import org.apache.dubbo.remoting.http12.message.HttpMessageEncoder;
 import org.apache.dubbo.remoting.http12.message.ServerSentEventEncoder;
 
 public class Http1SseServerChannelObserver extends Http1ServerChannelObserver {
 
+    private HttpMessageEncoder originalResponseEncoder;
+
     public Http1SseServerChannelObserver(HttpChannel httpChannel) {
         super(httpChannel);
     }
@@ -33,6 +37,7 @@ public class Http1SseServerChannelObserver extends 
Http1ServerChannelObserver {
     @Override
     public void setResponseEncoder(HttpMessageEncoder responseEncoder) {
         super.setResponseEncoder(new ServerSentEventEncoder(responseEncoder));
+        this.originalResponseEncoder = responseEncoder;
     }
 
     @Override
@@ -49,4 +54,25 @@ public class Http1SseServerChannelObserver extends 
Http1ServerChannelObserver {
                 .header(HttpHeaderNames.TRANSFER_ENCODING.getKey(), 
HttpConstants.CHUNKED)
                 .header(HttpHeaderNames.CACHE_CONTROL.getKey(), 
HttpConstants.NO_CACHE);
     }
+
+    @Override
+    protected HttpOutputMessage buildMessage(int statusCode, Object data) 
throws Throwable {
+        if (data instanceof HttpResult) {
+            data = ((HttpResult<?>) data).getBody();
+
+            if (data == null && statusCode != 200) {
+                return null;
+            }
+
+            HttpOutputMessage message = encodeHttpOutputMessage(data);
+            try {
+                originalResponseEncoder.encode(message.getBody(), data);
+            } catch (Throwable t) {
+                message.close();
+                throw t;
+            }
+            return message;
+        }
+        return super.buildMessage(statusCode, data);
+    }
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2SseServerChannelObserver.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2SseServerChannelObserver.java
index 3dd5fe9cec..dfc50c9287 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2SseServerChannelObserver.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2SseServerChannelObserver.java
@@ -19,6 +19,8 @@ package org.apache.dubbo.rpc.protocol.tri.h12.http2;
 import org.apache.dubbo.remoting.http12.HttpConstants;
 import org.apache.dubbo.remoting.http12.HttpHeaderNames;
 import org.apache.dubbo.remoting.http12.HttpMetadata;
+import org.apache.dubbo.remoting.http12.HttpOutputMessage;
+import org.apache.dubbo.remoting.http12.HttpResult;
 import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
 import org.apache.dubbo.remoting.http12.message.HttpMessageEncoder;
 import org.apache.dubbo.remoting.http12.message.ServerSentEventEncoder;
@@ -26,6 +28,8 @@ import org.apache.dubbo.rpc.model.FrameworkModel;
 
 public final class Http2SseServerChannelObserver extends 
Http2StreamServerChannelObserver {
 
+    private HttpMessageEncoder originalResponseEncoder;
+
     public Http2SseServerChannelObserver(FrameworkModel frameworkModel, 
H2StreamChannel h2StreamChannel) {
         super(frameworkModel, h2StreamChannel);
     }
@@ -33,6 +37,7 @@ public final class Http2SseServerChannelObserver extends 
Http2StreamServerChanne
     @Override
     public void setResponseEncoder(HttpMessageEncoder responseEncoder) {
         super.setResponseEncoder(new ServerSentEventEncoder(responseEncoder));
+        this.originalResponseEncoder = responseEncoder;
     }
 
     @Override
@@ -41,6 +46,27 @@ public final class Http2SseServerChannelObserver extends 
Http2StreamServerChanne
                 .header(HttpHeaderNames.CACHE_CONTROL.getKey(), 
HttpConstants.NO_CACHE);
     }
 
+    @Override
+    protected HttpOutputMessage buildMessage(int statusCode, Object data) 
throws Throwable {
+        if (data instanceof HttpResult) {
+            data = ((HttpResult<?>) data).getBody();
+
+            if (data == null && statusCode != 200) {
+                return null;
+            }
+
+            HttpOutputMessage message = encodeHttpOutputMessage(data);
+            try {
+                originalResponseEncoder.encode(message.getBody(), data);
+            } catch (Throwable t) {
+                message.close();
+                throw t;
+            }
+            return message;
+        }
+        return super.buildMessage(statusCode, data);
+    }
+
     @Override
     protected void doOnCompleted(Throwable throwable) {
         // if throwable is not null, the header will be flushed by 
super.doOnCompleted(throwable)

Reply via email to