This is an automated email from the ASF dual-hosted git repository.
guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 6a79caf optimize triple server send frame error handler (#8922)
6a79caf is described below
commit 6a79caffe59396acea7ebacecbc4388917065512
Author: earthchen <[email protected]>
AuthorDate: Mon Sep 27 11:15:43 2021 +0800
optimize triple server send frame error handler (#8922)
---
.../dubbo/rpc/protocol/tri/ServerTransportObserver.java | 11 +++++------
.../rpc/protocol/tri/TripleHttp2FrameServerHandler.java | 14 ++------------
2 files changed, 7 insertions(+), 18 deletions(-)
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
index fdb5085..552ce7e 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
@@ -22,7 +22,6 @@ import org.apache.dubbo.common.logger.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
@@ -32,16 +31,15 @@ import io.netty.handler.codec.http2.Http2Error;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
public class ServerTransportObserver implements TransportObserver {
+
private static final Logger LOGGER =
LoggerFactory.getLogger(ServerTransportObserver.class);
private final ChannelHandlerContext ctx;
- private final ChannelPromise promise;
private boolean headerSent = false;
private boolean resetSent = false;
- public ServerTransportObserver(ChannelHandlerContext ctx, ChannelPromise
promise) {
+ public ServerTransportObserver(ChannelHandlerContext ctx) {
this.ctx = ctx;
- this.promise = promise;
}
@Override
@@ -58,10 +56,11 @@ public class ServerTransportObserver implements
TransportObserver {
headers.status(OK.codeAsText());
headers.set(TripleHeaderEnum.CONTENT_TYPE_KEY.getHeader(),
TripleConstant.CONTENT_PROTO);
}
+ // If endStream is true, the channel will be closed, so you cannot
listen for errors and continue sending any frame
ctx.writeAndFlush(new DefaultHttp2HeadersFrame(headers, endStream))
.addListener(future -> {
if (!future.isSuccess()) {
- LOGGER.warn("write header error", future.cause());
+ LOGGER.warn("send header error endStream=" +
endStream, future.cause());
}
});
}
@@ -89,7 +88,7 @@ public class ServerTransportObserver implements
TransportObserver {
ctx.writeAndFlush(new DefaultHttp2DataFrame(buf, false))
.addListener(future -> {
if (!future.isSuccess()) {
- LOGGER.warn("write data error", future.cause());
+ LOGGER.warn("send data error endStream=" + endStream,
future.cause());
}
});
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
index 34ee52f..e3d0421 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
@@ -33,7 +33,6 @@ import
org.apache.dubbo.rpc.service.ServiceDescriptorInternalCache;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
@@ -216,16 +215,11 @@ public class TripleHttp2FrameServerHandler extends
ChannelDuplexHandler {
stream = AbstractServerStream.unary(invoker.getUrl());
}
Channel channel = ctx.channel();
- ChannelPromise promise = channel.newPromise();
- promise.addListener(future -> {
- if (!future.isSuccess()) {
- exceptionCaught(ctx, future.cause());
- }
- });
+ // You can add listeners to ChannelPromise here if you want to listen
for the result of sending a frame
stream.service(providerModel.getServiceModel())
.invoker(invoker)
.methodName(methodName)
- .subscribe(new ServerTransportObserver(ctx, promise));
+ .subscribe(new ServerTransportObserver(ctx));
if (methodDescriptor != null) {
stream.method(methodDescriptor);
} else {
@@ -250,8 +244,4 @@ public class TripleHttp2FrameServerHandler extends
ChannelDuplexHandler {
return CommonConstants.$INVOKE.equals(methodName) ||
CommonConstants.$INVOKE_ASYNC.equals(methodName);
}
- @Override
- public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise
promise) throws Exception {
- super.write(ctx, msg, promise);
- }
}