This is an automated email from the ASF dual-hosted git repository.
zrlw pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 770c8ad14d Fix StreamObserver object calls onCompleted() without first
calling onNext() will fail. (#15466)
770c8ad14d is described below
commit 770c8ad14d7bbe633fab226545328a7d94d0ac81
Author: stellar <[email protected]>
AuthorDate: Thu Jun 19 16:12:16 2025 +0800
Fix StreamObserver object calls onCompleted() without first calling
onNext() will fail. (#15466)
---
.../dubbo/remoting/http12/AbstractServerHttpChannelObserver.java | 7 +++++++
.../protocol/tri/h12/http1/Http1SseServerChannelObserver.java | 8 ++++++++
.../protocol/tri/h12/http2/Http2SseServerChannelObserver.java | 9 +++++++++
3 files changed, 24 insertions(+)
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
index e6c4277bb4..9d7f080223 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
@@ -188,6 +188,9 @@ public abstract class AbstractServerHttpChannelObserver<H
extends HttpChannel> i
}
protected final void sendMetadata(HttpMetadata metadata) {
+ if (headerSent) {
+ return;
+ }
getHttpChannel().writeHeader(metadata);
headerSent = true;
if (LOGGER.isDebugEnabled()) {
@@ -327,6 +330,10 @@ public abstract class AbstractServerHttpChannelObserver<H
extends HttpChannel> i
return responseEncoder.contentType();
}
+ protected boolean isHeaderSent() {
+ return headerSent;
+ }
+
protected void customizeTrailers(HttpHeaders headers, Throwable throwable)
{
List<BiConsumer<HttpHeaders, Throwable>> trailersCustomizers =
this.trailersCustomizers;
if (trailersCustomizers != null) {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/Http1SseServerChannelObserver.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/Http1SseServerChannelObserver.java
index 867bc0ff7e..9b62aa6762 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/Http1SseServerChannelObserver.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/Http1SseServerChannelObserver.java
@@ -35,6 +35,14 @@ public class Http1SseServerChannelObserver extends
Http1ServerChannelObserver {
super.setResponseEncoder(new ServerSentEventEncoder(responseEncoder));
}
+ @Override
+ protected void doOnCompleted(Throwable throwable) {
+ if (!isHeaderSent()) {
+ sendMetadata(encodeHttpMetadata(true));
+ }
+ super.doOnCompleted(throwable);
+ }
+
@Override
protected HttpMetadata encodeHttpMetadata(boolean endStream) {
return super.encodeHttpMetadata(endStream)
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2SseServerChannelObserver.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2SseServerChannelObserver.java
index 807d74da52..3dd5fe9cec 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2SseServerChannelObserver.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2SseServerChannelObserver.java
@@ -40,4 +40,13 @@ public final class Http2SseServerChannelObserver extends
Http2StreamServerChanne
return super.encodeHttpMetadata(endStream)
.header(HttpHeaderNames.CACHE_CONTROL.getKey(),
HttpConstants.NO_CACHE);
}
+
+ @Override
+ protected void doOnCompleted(Throwable throwable) {
+ // if throwable is not null, the header will be flushed by
super.doOnCompleted(throwable)
+ if (!isHeaderSent() && throwable == null) {
+ sendMetadata(encodeHttpMetadata(true));
+ }
+ super.doOnCompleted(throwable);
+ }
}