This is an automated email from the ASF dual-hosted git repository.
zrlw pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 3e8d1f5387 Support returning JSON content when using SSE (#15464)
3e8d1f5387 is described below
commit 3e8d1f5387bfe23d52dae96679c9f199e11289ab
Author: Sean Yang <[email protected]>
AuthorDate: Mon Jun 23 09:39:11 2025 +0800
Support returning JSON content when using SSE (#15464)
* Support returning JSON content when using SSE
* Update Http2SseServerChannelObserver.java
* Update Http2SseServerChannelObserver.java
---------
Co-authored-by: zrlw <[email protected]>
---
.../http12/AbstractServerHttpChannelObserver.java | 4 ++--
.../tri/h12/ServerStreamServerCallListener.java | 8 ++++++-
.../h12/http1/Http1SseServerChannelObserver.java | 26 ++++++++++++++++++++++
.../h12/http2/Http2SseServerChannelObserver.java | 26 ++++++++++++++++++++++
4 files changed, 61 insertions(+), 3 deletions(-)
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
index 9d7f080223..c2473c85ab 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
@@ -166,13 +166,13 @@ public abstract class AbstractServerHttpChannelObserver<H
extends HttpChannel> i
if (message != null) {
headers.set(HttpHeaderNames.CONTENT_TYPE.getKey(),
responseEncoder.contentType());
}
+ customizeHeaders(headers, throwable, message);
if (data instanceof HttpResult) {
HttpResult<?> result = (HttpResult<?>) data;
if (result.getHeaders() != null) {
headers.set(result.getHeaders());
}
}
- customizeHeaders(headers, throwable, message);
return metadata;
}
@@ -198,7 +198,7 @@ public abstract class AbstractServerHttpChannelObserver<H
extends HttpChannel> i
}
}
- protected final HttpOutputMessage buildMessage(int statusCode, Object
data) throws Throwable {
+ protected HttpOutputMessage buildMessage(int statusCode, Object data)
throws Throwable {
if (statusCode < 200 || statusCode == 204 || statusCode == 304) {
return null;
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/ServerStreamServerCallListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/ServerStreamServerCallListener.java
index 8c6a83f37a..8315ff7aba 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/ServerStreamServerCallListener.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/ServerStreamServerCallListener.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.rpc.protocol.tri.h12;
import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.remoting.http12.HttpResult;
import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcInvocation;
@@ -29,7 +30,12 @@ public class ServerStreamServerCallListener extends
AbstractServerCallListener {
}
@Override
- public void onReturn(Object value) {}
+ public void onReturn(Object value) {
+ if (value instanceof HttpResult) {
+ responseObserver.onNext(value);
+ responseObserver.onCompleted();
+ }
+ }
@Override
public void onMessage(Object message) {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/Http1SseServerChannelObserver.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/Http1SseServerChannelObserver.java
index 9b62aa6762..7499a16c95 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/Http1SseServerChannelObserver.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/Http1SseServerChannelObserver.java
@@ -20,12 +20,16 @@ import org.apache.dubbo.remoting.http12.HttpChannel;
import org.apache.dubbo.remoting.http12.HttpConstants;
import org.apache.dubbo.remoting.http12.HttpHeaderNames;
import org.apache.dubbo.remoting.http12.HttpMetadata;
+import org.apache.dubbo.remoting.http12.HttpOutputMessage;
+import org.apache.dubbo.remoting.http12.HttpResult;
import org.apache.dubbo.remoting.http12.h1.Http1ServerChannelObserver;
import org.apache.dubbo.remoting.http12.message.HttpMessageEncoder;
import org.apache.dubbo.remoting.http12.message.ServerSentEventEncoder;
public class Http1SseServerChannelObserver extends Http1ServerChannelObserver {
+ private HttpMessageEncoder originalResponseEncoder;
+
public Http1SseServerChannelObserver(HttpChannel httpChannel) {
super(httpChannel);
}
@@ -33,6 +37,7 @@ public class Http1SseServerChannelObserver extends
Http1ServerChannelObserver {
@Override
public void setResponseEncoder(HttpMessageEncoder responseEncoder) {
super.setResponseEncoder(new ServerSentEventEncoder(responseEncoder));
+ this.originalResponseEncoder = responseEncoder;
}
@Override
@@ -49,4 +54,25 @@ public class Http1SseServerChannelObserver extends
Http1ServerChannelObserver {
.header(HttpHeaderNames.TRANSFER_ENCODING.getKey(),
HttpConstants.CHUNKED)
.header(HttpHeaderNames.CACHE_CONTROL.getKey(),
HttpConstants.NO_CACHE);
}
+
+ @Override
+ protected HttpOutputMessage buildMessage(int statusCode, Object data)
throws Throwable {
+ if (data instanceof HttpResult) {
+ data = ((HttpResult<?>) data).getBody();
+
+ if (data == null && statusCode != 200) {
+ return null;
+ }
+
+ HttpOutputMessage message = encodeHttpOutputMessage(data);
+ try {
+ originalResponseEncoder.encode(message.getBody(), data);
+ } catch (Throwable t) {
+ message.close();
+ throw t;
+ }
+ return message;
+ }
+ return super.buildMessage(statusCode, data);
+ }
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2SseServerChannelObserver.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2SseServerChannelObserver.java
index 3dd5fe9cec..dfc50c9287 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2SseServerChannelObserver.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2SseServerChannelObserver.java
@@ -19,6 +19,8 @@ package org.apache.dubbo.rpc.protocol.tri.h12.http2;
import org.apache.dubbo.remoting.http12.HttpConstants;
import org.apache.dubbo.remoting.http12.HttpHeaderNames;
import org.apache.dubbo.remoting.http12.HttpMetadata;
+import org.apache.dubbo.remoting.http12.HttpOutputMessage;
+import org.apache.dubbo.remoting.http12.HttpResult;
import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
import org.apache.dubbo.remoting.http12.message.HttpMessageEncoder;
import org.apache.dubbo.remoting.http12.message.ServerSentEventEncoder;
@@ -26,6 +28,8 @@ import org.apache.dubbo.rpc.model.FrameworkModel;
public final class Http2SseServerChannelObserver extends
Http2StreamServerChannelObserver {
+ private HttpMessageEncoder originalResponseEncoder;
+
public Http2SseServerChannelObserver(FrameworkModel frameworkModel,
H2StreamChannel h2StreamChannel) {
super(frameworkModel, h2StreamChannel);
}
@@ -33,6 +37,7 @@ public final class Http2SseServerChannelObserver extends
Http2StreamServerChanne
@Override
public void setResponseEncoder(HttpMessageEncoder responseEncoder) {
super.setResponseEncoder(new ServerSentEventEncoder(responseEncoder));
+ this.originalResponseEncoder = responseEncoder;
}
@Override
@@ -41,6 +46,27 @@ public final class Http2SseServerChannelObserver extends
Http2StreamServerChanne
.header(HttpHeaderNames.CACHE_CONTROL.getKey(),
HttpConstants.NO_CACHE);
}
+ @Override
+ protected HttpOutputMessage buildMessage(int statusCode, Object data)
throws Throwable {
+ if (data instanceof HttpResult) {
+ data = ((HttpResult<?>) data).getBody();
+
+ if (data == null && statusCode != 200) {
+ return null;
+ }
+
+ HttpOutputMessage message = encodeHttpOutputMessage(data);
+ try {
+ originalResponseEncoder.encode(message.getBody(), data);
+ } catch (Throwable t) {
+ message.close();
+ throw t;
+ }
+ return message;
+ }
+ return super.buildMessage(statusCode, data);
+ }
+
@Override
protected void doOnCompleted(Throwable throwable) {
// if throwable is not null, the header will be flushed by
super.doOnCompleted(throwable)