This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new e49b8b2411 When a header frame with an end_stream flag is received,
the close method of the streaming decoder is called (#14313)
e49b8b2411 is described below
commit e49b8b241191fc6bd2b3ab5b3fcdcc855a541879
Author: TomlongTK <[email protected]>
AuthorDate: Fri Jun 14 11:07:23 2024 +0800
When a header frame with an end_stream flag is received, the close method
of the streaming decoder is called (#14313)
---
.../http12/AbstractServerHttpChannelObserver.java | 5 +++++
.../h12/grpc/GrpcHttp2ServerTransportListener.java | 2 +-
.../http2/GenericHttp2ServerTransportListener.java | 21 +++------------------
3 files changed, 9 insertions(+), 19 deletions(-)
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
index fb9c54f79c..21c820055c 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
@@ -138,6 +138,11 @@ public abstract class AbstractServerHttpChannelObserver
implements CustomizableH
if (httpMetadata == null) {
return;
}
+ if (!headerSent) {
+ HttpHeaders headers = httpMetadata.headers();
+ headers.set(HttpHeaderNames.STATUS.getName(),
resolveStatusCode(throwable));
+ headers.set(HttpHeaderNames.CONTENT_TYPE.getName(),
responseEncoder.contentType());
+ }
trailersCustomizer.accept(httpMetadata.headers(), throwable);
getHttpChannel().writeHeader(httpMetadata);
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
index 1b25dc57a1..d7be95ee66 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
@@ -92,8 +92,8 @@ public class GrpcHttp2ServerTransportListener extends
GenericHttp2ServerTranspor
@Override
protected void onMetadataCompletion(Http2Header metadata) {
- super.onMetadataCompletion(metadata);
processGrpcHeaders(metadata);
+ super.onMetadataCompletion(metadata);
}
private void processGrpcHeaders(Http2Header metadata) {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
index 9c46944ccf..e6c58d6fb0 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
@@ -19,12 +19,10 @@ package org.apache.dubbo.rpc.protocol.tri.h12.http2;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.threadpool.serial.SerializingExecutor;
-import org.apache.dubbo.remoting.http12.HttpMethods;
import org.apache.dubbo.remoting.http12.h2.CancelStreamException;
import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
import org.apache.dubbo.remoting.http12.h2.Http2Header;
import org.apache.dubbo.remoting.http12.h2.Http2InputMessage;
-import org.apache.dubbo.remoting.http12.h2.Http2InputMessageFrame;
import org.apache.dubbo.remoting.http12.h2.Http2ServerChannelObserver;
import org.apache.dubbo.remoting.http12.h2.Http2TransportListener;
import org.apache.dubbo.remoting.http12.message.DefaultListeningDecoder;
@@ -50,15 +48,11 @@ import
org.apache.dubbo.rpc.protocol.tri.h12.ServerStreamServerCallListener;
import org.apache.dubbo.rpc.protocol.tri.h12.StreamingHttpMessageListener;
import org.apache.dubbo.rpc.protocol.tri.h12.UnaryServerCallListener;
-import java.io.ByteArrayInputStream;
import java.util.concurrent.Executor;
public class GenericHttp2ServerTransportListener extends
AbstractServerTransportListener<Http2Header, Http2InputMessage>
implements Http2TransportListener {
- private static final Http2InputMessage EMPTY_MESSAGE =
- new Http2InputMessageFrame(new ByteArrayInputStream(new byte[0]),
true);
-
private final ExecutorSupport executorSupport;
private final StreamingDecoder streamingDecoder;
private final FrameworkModel frameworkModel;
@@ -93,18 +87,6 @@ public class GenericHttp2ServerTransportListener extends
AbstractServerTransport
return new SerializingExecutor(executorSupport.getExecutor(metadata));
}
- @Override
- protected void doOnMetadata(Http2Header metadata) {
- if (metadata.isEndStream()) {
- if (!HttpMethods.supportBody(metadata.method())) {
- super.doOnMetadata(metadata);
- doOnData(EMPTY_MESSAGE);
- }
- return;
- }
- super.doOnMetadata(metadata);
- }
-
@Override
protected HttpMessageListener buildHttpMessageListener() {
RpcInvocationBuildContext context = getContext();
@@ -180,6 +162,9 @@ public class GenericHttp2ServerTransportListener extends
AbstractServerTransport
protected void onMetadataCompletion(Http2Header metadata) {
serverChannelObserver.setResponseEncoder(getContext().getHttpMessageEncoder());
serverChannelObserver.request(1);
+ if (metadata.isEndStream()) {
+ getStreamingDecoder().close();
+ }
}
@Override