This is an automated email from the ASF dual-hosted git repository.
zrlw pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new d2c0478a0a Close channel handler context after the channel written
operation is completed and Remove overflow check process(#15518)
d2c0478a0a is described below
commit d2c0478a0a182dba392cd76306a076c51617aa00
Author: zrlw <[email protected]>
AuthorDate: Mon Jul 7 06:20:54 2025 +0800
Close channel handler context after the channel written operation is
completed and Remove overflow check process(#15518)
* Close channel handler context when promise is done
* Just log warning message when window size of connectionState is zero
* Log debug message for Http2Headers and Data packets
* Close channel handler context after pongHeader written operation is
completed
* Close channel handler context after goAwayFrame written operation is
completed
* Shutdown QUIC stream channel output after headers frame written operation
is completed
* Remove overflow check process to align with Netty http2 remote flow
controller
* Fix Http3ClientFrameCodec write method to set shutdownOutput promise for
Http2HeadersFrame
* Fix promise setting codes of Http3ClientFrameCodec and
NettyHttp3FrameCodec
---
.../http3/netty4/NettyHttp3FrameCodec.java | 54 ++++++++++++++--------
.../protocol/tri/TriHttp2RemoteFlowController.java | 21 ++-------
.../rpc/protocol/tri/h3/Http3ClientFrameCodec.java | 30 ++++++++++--
.../tri/stream/AbstractTripleClientStream.java | 6 +++
.../protocol/tri/transport/GracefulShutdown.java | 11 +++--
5 files changed, 79 insertions(+), 43 deletions(-)
diff --git
a/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/http3/netty4/NettyHttp3FrameCodec.java
b/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/http3/netty4/NettyHttp3FrameCodec.java
index 9ef885ed3e..dfa8b6d826 100644
---
a/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/http3/netty4/NettyHttp3FrameCodec.java
+++
b/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/http3/netty4/NettyHttp3FrameCodec.java
@@ -30,7 +30,8 @@ import java.net.SocketAddress;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
-import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandler;
@@ -67,8 +68,12 @@ public class NettyHttp3FrameCodec extends
Http3RequestStreamInboundHandler imple
Http3Headers pongHeader = new DefaultHttp3Headers(false);
pongHeader.set(TRI_PING, "0");
pongHeader.set(PseudoHeaderName.STATUS.value(),
HttpStatus.OK.getStatusString());
- ctx.write(new DefaultHttp3HeadersFrame(pongHeader));
- ctx.close();
+ ChannelFuture future = ctx.write(new
DefaultHttp3HeadersFrame(pongHeader), ctx.newPromise());
+ if (future.isDone()) {
+ ctx.close();
+ } else {
+ future.addListener((ChannelFutureListener) f -> ctx.close());
+ }
}
@Override
@@ -91,32 +96,43 @@ public class NettyHttp3FrameCodec extends
Http3RequestStreamInboundHandler imple
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise
promise) throws Exception {
if (msg instanceof Http2Header) {
Http2Header headers = (Http2Header) msg;
+ if (headers.isEndStream()) {
+ ChannelFuture future = ctx.write(
+ new
DefaultHttp3HeadersFrame(((NettyHttpHeaders<Http3Headers>)
headers.headers()).getHeaders()),
+ ctx.newPromise());
+ if (future.isDone()) {
+ ctx.close(promise);
+ } else {
+ future.addListener((ChannelFutureListener) f ->
ctx.close(promise));
+ }
+ return;
+ }
ctx.write(
new
DefaultHttp3HeadersFrame(((NettyHttpHeaders<Http3Headers>)
headers.headers()).getHeaders()),
promise);
- if (headers.isEndStream()) {
- ctx.close();
- }
} else if (msg instanceof Http2OutputMessage) {
Http2OutputMessage message = (Http2OutputMessage) msg;
- try {
- OutputStream body = message.getBody();
+ OutputStream body = message.getBody();
+ assert body instanceof ByteBufOutputStream || body == null;
+ if (message.isEndStream()) {
if (body == null) {
- Http3DataFrame frame = new
DefaultHttp3DataFrame(Unpooled.EMPTY_BUFFER);
- ctx.write(frame, promise);
+ ctx.close(promise);
return;
}
- if (body instanceof ByteBufOutputStream) {
- Http3DataFrame frame = new
DefaultHttp3DataFrame(((ByteBufOutputStream) body).buffer());
- ctx.write(frame, promise);
- return;
- }
- } finally {
- if (message.isEndStream()) {
- ctx.close();
+ ChannelFuture future =
+ ctx.write(new
DefaultHttp3DataFrame(((ByteBufOutputStream) body).buffer()), ctx.newPromise());
+ if (future.isDone()) {
+ ctx.close(promise);
+ } else {
+ future.addListener((ChannelFutureListener) f ->
ctx.close(promise));
}
+ return;
+ }
+ if (body == null) {
+ promise.trySuccess();
+ return;
}
- throw new IllegalArgumentException("Http2OutputMessage body must
be ByteBufOutputStream");
+ ctx.write(new DefaultHttp3DataFrame(((ByteBufOutputStream)
body).buffer()), promise);
} else {
ctx.write(msg, promise);
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriHttp2RemoteFlowController.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriHttp2RemoteFlowController.java
index e224fc4b8f..2b23e17664 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriHttp2RemoteFlowController.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriHttp2RemoteFlowController.java
@@ -407,10 +407,8 @@ public class TriHttp2RemoteFlowController implements
Http2RemoteFlowController {
if (cancelled) {
cancel(INTERNAL_ERROR, cause);
}
- if (monitor.isOverFlowControl()) {
- cause = new Throwable();
- cancel(FLOW_CONTROL_ERROR, cause);
- }
+
+ // does not check overflow anymore: Let receiver continue
receiving the pending bytes.
}
return writtenBytes;
}
@@ -670,14 +668,6 @@ public class TriHttp2RemoteFlowController implements
Http2RemoteFlowController {
final boolean isWritableConnection() {
return connectionState.windowSize() - totalPendingBytes > 0 &&
isChannelWritable();
}
-
- final boolean isOverFlowControl() {
- if (connectionState.windowSize() == 0) {
- return true;
- } else {
- return false;
- }
- }
}
/**
@@ -777,13 +767,8 @@ public class TriHttp2RemoteFlowController implements
Http2RemoteFlowController {
checkAllWritabilityChanged();
} else if (isWritable(state) != state.markedWritability()) {
notifyWritabilityChanged(state);
- } else if (isOverFlowControl()) {
- throw streamError(
- state.stream().id(),
- FLOW_CONTROL_ERROR,
- "TotalPendingBytes size overflow for stream: %d",
- state.stream().id());
}
+ // does not check overflow anymore: Let receiver continue
receiving the pending bytes.
}
private void checkAllWritabilityChanged() throws Http2Exception {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3ClientFrameCodec.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3ClientFrameCodec.java
index da4e9cd9e9..82e41bd333 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3ClientFrameCodec.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3ClientFrameCodec.java
@@ -26,6 +26,8 @@ import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
@@ -99,14 +101,36 @@ public class Http3ClientFrameCodec extends
ChannelDuplexHandler {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise
promise) throws Exception {
if (msg instanceof Http2HeadersFrame) {
Http2HeadersFrame frame = (Http2HeadersFrame) msg;
- ctx.write(new DefaultHttp3HeadersFrame(new
Http3HeadersAdapter(frame.headers())), promise);
if (frame.isEndStream()) {
- ((QuicStreamChannel) ctx.channel()).shutdownOutput(promise);
+ ChannelFuture future = ctx.write(
+ new DefaultHttp3HeadersFrame(new
Http3HeadersAdapter(frame.headers())), ctx.newPromise());
+ if (future.isDone()) {
+ ((QuicStreamChannel)
ctx.channel()).shutdownOutput(promise);
+ } else {
+ future.addListener(
+ (ChannelFutureListener) f -> ((QuicStreamChannel)
ctx.channel()).shutdownOutput(promise));
+ }
+ return;
}
+ ctx.write(new DefaultHttp3HeadersFrame(new
Http3HeadersAdapter(frame.headers())), promise);
} else if (msg instanceof Http2DataFrame) {
Http2DataFrame frame = (Http2DataFrame) msg;
if (frame.isEndStream()) {
- ((QuicStreamChannel) ctx.channel()).shutdownOutput(promise);
+ if (Unpooled.EMPTY_BUFFER.equals(frame.content())) {
+ ((QuicStreamChannel)
ctx.channel()).shutdownOutput(promise);
+ return;
+ }
+ ChannelFuture future = ctx.write(new
DefaultHttp3DataFrame(frame.content()), ctx.newPromise());
+ if (future.isDone()) {
+ ((QuicStreamChannel)
ctx.channel()).shutdownOutput(promise);
+ } else {
+ future.addListener(
+ (ChannelFutureListener) f -> ((QuicStreamChannel)
ctx.channel()).shutdownOutput(promise));
+ }
+ return;
+ }
+ if (Unpooled.EMPTY_BUFFER.equals(frame.content())) {
+ promise.trySuccess();
return;
}
ctx.write(new DefaultHttp3DataFrame(frame.content()), promise);
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
index 39b0df645a..89927fed3e 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
@@ -409,6 +409,9 @@ public abstract class AbstractTripleClientStream extends
AbstractStream implemen
@Override
public void onHeader(Http2Headers headers, boolean endStream) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("endStream: {} HEADERS: {}", endStream, headers);
+ }
executor.execute(() -> {
if (endStream) {
if (!halfClosed) {
@@ -428,6 +431,9 @@ public abstract class AbstractTripleClientStream extends
AbstractStream implemen
@Override
public void onData(ByteBuf data, boolean endStream) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("endStream: {} DATA: {}", endStream,
data.toString(StandardCharsets.UTF_8));
+ }
try {
executor.execute(() -> doOnData(data, endStream));
} catch (Throwable t) {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/GracefulShutdown.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/GracefulShutdown.java
index 29c0b6f44f..5e86b592c0 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/GracefulShutdown.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/GracefulShutdown.java
@@ -19,6 +19,8 @@ package org.apache.dubbo.rpc.protocol.tri.transport;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.DefaultHttp2GoAwayFrame;
@@ -66,9 +68,12 @@ public class GracefulShutdown {
try {
Http2GoAwayFrame goAwayFrame = new DefaultHttp2GoAwayFrame(
Http2Error.NO_ERROR,
ByteBufUtil.writeAscii(this.ctx.alloc(), this.goAwayMessage));
- ctx.writeAndFlush(goAwayFrame);
- // TODO support customize graceful shutdown timeout mills
- ctx.close(originPromise);
+ ChannelFuture future = ctx.writeAndFlush(goAwayFrame,
ctx.newPromise());
+ if (future.isDone()) {
+ ctx.close(originPromise);
+ } else {
+ future.addListener((ChannelFutureListener) f ->
ctx.close(originPromise));
+ }
} catch (Exception e) {
ctx.fireExceptionCaught(e);
}