This is an automated email from the ASF dual-hosted git repository.

zrlw pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 770c8ad14d Fix StreamObserver object calls onCompleted() without first 
calling onNext() will fail. (#15466)
770c8ad14d is described below

commit 770c8ad14d7bbe633fab226545328a7d94d0ac81
Author: stellar <[email protected]>
AuthorDate: Thu Jun 19 16:12:16 2025 +0800

    Fix StreamObserver object calls onCompleted() without first calling 
onNext() will fail. (#15466)
---
 .../dubbo/remoting/http12/AbstractServerHttpChannelObserver.java | 7 +++++++
 .../protocol/tri/h12/http1/Http1SseServerChannelObserver.java    | 8 ++++++++
 .../protocol/tri/h12/http2/Http2SseServerChannelObserver.java    | 9 +++++++++
 3 files changed, 24 insertions(+)

diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
index e6c4277bb4..9d7f080223 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
@@ -188,6 +188,9 @@ public abstract class AbstractServerHttpChannelObserver<H 
extends HttpChannel> i
     }
 
     protected final void sendMetadata(HttpMetadata metadata) {
+        if (headerSent) {
+            return;
+        }
         getHttpChannel().writeHeader(metadata);
         headerSent = true;
         if (LOGGER.isDebugEnabled()) {
@@ -327,6 +330,10 @@ public abstract class AbstractServerHttpChannelObserver<H 
extends HttpChannel> i
         return responseEncoder.contentType();
     }
 
+    protected boolean isHeaderSent() {
+        return headerSent;
+    }
+
     protected void customizeTrailers(HttpHeaders headers, Throwable throwable) 
{
         List<BiConsumer<HttpHeaders, Throwable>> trailersCustomizers = 
this.trailersCustomizers;
         if (trailersCustomizers != null) {
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/Http1SseServerChannelObserver.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/Http1SseServerChannelObserver.java
index 867bc0ff7e..9b62aa6762 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/Http1SseServerChannelObserver.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/Http1SseServerChannelObserver.java
@@ -35,6 +35,14 @@ public class Http1SseServerChannelObserver extends 
Http1ServerChannelObserver {
         super.setResponseEncoder(new ServerSentEventEncoder(responseEncoder));
     }
 
+    @Override
+    protected void doOnCompleted(Throwable throwable) {
+        if (!isHeaderSent()) {
+            sendMetadata(encodeHttpMetadata(true));
+        }
+        super.doOnCompleted(throwable);
+    }
+
     @Override
     protected HttpMetadata encodeHttpMetadata(boolean endStream) {
         return super.encodeHttpMetadata(endStream)
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2SseServerChannelObserver.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2SseServerChannelObserver.java
index 807d74da52..3dd5fe9cec 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2SseServerChannelObserver.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2SseServerChannelObserver.java
@@ -40,4 +40,13 @@ public final class Http2SseServerChannelObserver extends 
Http2StreamServerChanne
         return super.encodeHttpMetadata(endStream)
                 .header(HttpHeaderNames.CACHE_CONTROL.getKey(), 
HttpConstants.NO_CACHE);
     }
+
+    @Override
+    protected void doOnCompleted(Throwable throwable) {
+        // if throwable is not null, the header will be flushed by 
super.doOnCompleted(throwable)
+        if (!isHeaderSent() && throwable == null) {
+            sendMetadata(encodeHttpMetadata(true));
+        }
+        super.doOnCompleted(throwable);
+    }
 }

Reply via email to