This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new e49b8b2411 When a header frame with an end_stream flag is received, 
the close method of the streaming decoder is called (#14313)
e49b8b2411 is described below

commit e49b8b241191fc6bd2b3ab5b3fcdcc855a541879
Author: TomlongTK <[email protected]>
AuthorDate: Fri Jun 14 11:07:23 2024 +0800

    When a header frame with an end_stream flag is received, the close method 
of the streaming decoder is called (#14313)
---
 .../http12/AbstractServerHttpChannelObserver.java   |  5 +++++
 .../h12/grpc/GrpcHttp2ServerTransportListener.java  |  2 +-
 .../http2/GenericHttp2ServerTransportListener.java  | 21 +++------------------
 3 files changed, 9 insertions(+), 19 deletions(-)

diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
index fb9c54f79c..21c820055c 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
@@ -138,6 +138,11 @@ public abstract class AbstractServerHttpChannelObserver 
implements CustomizableH
         if (httpMetadata == null) {
             return;
         }
+        if (!headerSent) {
+            HttpHeaders headers = httpMetadata.headers();
+            headers.set(HttpHeaderNames.STATUS.getName(), 
resolveStatusCode(throwable));
+            headers.set(HttpHeaderNames.CONTENT_TYPE.getName(), 
responseEncoder.contentType());
+        }
         trailersCustomizer.accept(httpMetadata.headers(), throwable);
         getHttpChannel().writeHeader(httpMetadata);
     }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
index 1b25dc57a1..d7be95ee66 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
@@ -92,8 +92,8 @@ public class GrpcHttp2ServerTransportListener extends 
GenericHttp2ServerTranspor
 
     @Override
     protected void onMetadataCompletion(Http2Header metadata) {
-        super.onMetadataCompletion(metadata);
         processGrpcHeaders(metadata);
+        super.onMetadataCompletion(metadata);
     }
 
     private void processGrpcHeaders(Http2Header metadata) {
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
index 9c46944ccf..e6c58d6fb0 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
@@ -19,12 +19,10 @@ package org.apache.dubbo.rpc.protocol.tri.h12.http2;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
 import org.apache.dubbo.common.threadpool.serial.SerializingExecutor;
-import org.apache.dubbo.remoting.http12.HttpMethods;
 import org.apache.dubbo.remoting.http12.h2.CancelStreamException;
 import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
 import org.apache.dubbo.remoting.http12.h2.Http2Header;
 import org.apache.dubbo.remoting.http12.h2.Http2InputMessage;
-import org.apache.dubbo.remoting.http12.h2.Http2InputMessageFrame;
 import org.apache.dubbo.remoting.http12.h2.Http2ServerChannelObserver;
 import org.apache.dubbo.remoting.http12.h2.Http2TransportListener;
 import org.apache.dubbo.remoting.http12.message.DefaultListeningDecoder;
@@ -50,15 +48,11 @@ import 
org.apache.dubbo.rpc.protocol.tri.h12.ServerStreamServerCallListener;
 import org.apache.dubbo.rpc.protocol.tri.h12.StreamingHttpMessageListener;
 import org.apache.dubbo.rpc.protocol.tri.h12.UnaryServerCallListener;
 
-import java.io.ByteArrayInputStream;
 import java.util.concurrent.Executor;
 
 public class GenericHttp2ServerTransportListener extends 
AbstractServerTransportListener<Http2Header, Http2InputMessage>
         implements Http2TransportListener {
 
-    private static final Http2InputMessage EMPTY_MESSAGE =
-            new Http2InputMessageFrame(new ByteArrayInputStream(new byte[0]), 
true);
-
     private final ExecutorSupport executorSupport;
     private final StreamingDecoder streamingDecoder;
     private final FrameworkModel frameworkModel;
@@ -93,18 +87,6 @@ public class GenericHttp2ServerTransportListener extends 
AbstractServerTransport
         return new SerializingExecutor(executorSupport.getExecutor(metadata));
     }
 
-    @Override
-    protected void doOnMetadata(Http2Header metadata) {
-        if (metadata.isEndStream()) {
-            if (!HttpMethods.supportBody(metadata.method())) {
-                super.doOnMetadata(metadata);
-                doOnData(EMPTY_MESSAGE);
-            }
-            return;
-        }
-        super.doOnMetadata(metadata);
-    }
-
     @Override
     protected HttpMessageListener buildHttpMessageListener() {
         RpcInvocationBuildContext context = getContext();
@@ -180,6 +162,9 @@ public class GenericHttp2ServerTransportListener extends 
AbstractServerTransport
     protected void onMetadataCompletion(Http2Header metadata) {
         
serverChannelObserver.setResponseEncoder(getContext().getHttpMessageEncoder());
         serverChannelObserver.request(1);
+        if (metadata.isEndStream()) {
+            getStreamingDecoder().close();
+        }
     }
 
     @Override

Reply via email to