This is an automated email from the ASF dual-hosted git repository.

zrlw pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new d2c0478a0a Close channel handler context after the channel written 
operation is completed and Remove overflow check process(#15518)
d2c0478a0a is described below

commit d2c0478a0a182dba392cd76306a076c51617aa00
Author: zrlw <[email protected]>
AuthorDate: Mon Jul 7 06:20:54 2025 +0800

    Close channel handler context after the channel written operation is 
completed and Remove overflow check process(#15518)
    
    * Close channel handler context when promise is done
    
    * Just log warning message when window size of connectionState is zero
    
    * Log debug message for Http2Headers and Data packets
    
    * Close channel handler context after pongHeader written operation is 
completed
    
    * Close channel handler context after goAwayFrame written operation is 
completed
    
    * Shutdown QUIC stream channel output after headers frame written operation 
is completed
    
    * Remove overflow check process to align with Netty http2 remote flow 
controller
    
    * Fix Http3ClientFrameCodec write method to set shutdownOutput promise for 
Http2HeadersFrame
    
    * Fix promise setting codes of Http3ClientFrameCodec and 
NettyHttp3FrameCodec
---
 .../http3/netty4/NettyHttp3FrameCodec.java         | 54 ++++++++++++++--------
 .../protocol/tri/TriHttp2RemoteFlowController.java | 21 ++-------
 .../rpc/protocol/tri/h3/Http3ClientFrameCodec.java | 30 ++++++++++--
 .../tri/stream/AbstractTripleClientStream.java     |  6 +++
 .../protocol/tri/transport/GracefulShutdown.java   | 11 +++--
 5 files changed, 79 insertions(+), 43 deletions(-)

diff --git 
a/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/http3/netty4/NettyHttp3FrameCodec.java
 
b/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/http3/netty4/NettyHttp3FrameCodec.java
index 9ef885ed3e..dfa8b6d826 100644
--- 
a/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/http3/netty4/NettyHttp3FrameCodec.java
+++ 
b/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/http3/netty4/NettyHttp3FrameCodec.java
@@ -30,7 +30,8 @@ import java.net.SocketAddress;
 
 import io.netty.buffer.ByteBufInputStream;
 import io.netty.buffer.ByteBufOutputStream;
-import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandler.Sharable;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelOutboundHandler;
@@ -67,8 +68,12 @@ public class NettyHttp3FrameCodec extends 
Http3RequestStreamInboundHandler imple
         Http3Headers pongHeader = new DefaultHttp3Headers(false);
         pongHeader.set(TRI_PING, "0");
         pongHeader.set(PseudoHeaderName.STATUS.value(), 
HttpStatus.OK.getStatusString());
-        ctx.write(new DefaultHttp3HeadersFrame(pongHeader));
-        ctx.close();
+        ChannelFuture future = ctx.write(new 
DefaultHttp3HeadersFrame(pongHeader), ctx.newPromise());
+        if (future.isDone()) {
+            ctx.close();
+        } else {
+            future.addListener((ChannelFutureListener) f -> ctx.close());
+        }
     }
 
     @Override
@@ -91,32 +96,43 @@ public class NettyHttp3FrameCodec extends 
Http3RequestStreamInboundHandler imple
     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise 
promise) throws Exception {
         if (msg instanceof Http2Header) {
             Http2Header headers = (Http2Header) msg;
+            if (headers.isEndStream()) {
+                ChannelFuture future = ctx.write(
+                        new 
DefaultHttp3HeadersFrame(((NettyHttpHeaders<Http3Headers>) 
headers.headers()).getHeaders()),
+                        ctx.newPromise());
+                if (future.isDone()) {
+                    ctx.close(promise);
+                } else {
+                    future.addListener((ChannelFutureListener) f -> 
ctx.close(promise));
+                }
+                return;
+            }
             ctx.write(
                     new 
DefaultHttp3HeadersFrame(((NettyHttpHeaders<Http3Headers>) 
headers.headers()).getHeaders()),
                     promise);
-            if (headers.isEndStream()) {
-                ctx.close();
-            }
         } else if (msg instanceof Http2OutputMessage) {
             Http2OutputMessage message = (Http2OutputMessage) msg;
-            try {
-                OutputStream body = message.getBody();
+            OutputStream body = message.getBody();
+            assert body instanceof ByteBufOutputStream || body == null;
+            if (message.isEndStream()) {
                 if (body == null) {
-                    Http3DataFrame frame = new 
DefaultHttp3DataFrame(Unpooled.EMPTY_BUFFER);
-                    ctx.write(frame, promise);
+                    ctx.close(promise);
                     return;
                 }
-                if (body instanceof ByteBufOutputStream) {
-                    Http3DataFrame frame = new 
DefaultHttp3DataFrame(((ByteBufOutputStream) body).buffer());
-                    ctx.write(frame, promise);
-                    return;
-                }
-            } finally {
-                if (message.isEndStream()) {
-                    ctx.close();
+                ChannelFuture future =
+                        ctx.write(new 
DefaultHttp3DataFrame(((ByteBufOutputStream) body).buffer()), ctx.newPromise());
+                if (future.isDone()) {
+                    ctx.close(promise);
+                } else {
+                    future.addListener((ChannelFutureListener) f -> 
ctx.close(promise));
                 }
+                return;
+            }
+            if (body == null) {
+                promise.trySuccess();
+                return;
             }
-            throw new IllegalArgumentException("Http2OutputMessage body must 
be ByteBufOutputStream");
+            ctx.write(new DefaultHttp3DataFrame(((ByteBufOutputStream) 
body).buffer()), promise);
         } else {
             ctx.write(msg, promise);
         }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriHttp2RemoteFlowController.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriHttp2RemoteFlowController.java
index e224fc4b8f..2b23e17664 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriHttp2RemoteFlowController.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriHttp2RemoteFlowController.java
@@ -407,10 +407,8 @@ public class TriHttp2RemoteFlowController implements 
Http2RemoteFlowController {
                 if (cancelled) {
                     cancel(INTERNAL_ERROR, cause);
                 }
-                if (monitor.isOverFlowControl()) {
-                    cause = new Throwable();
-                    cancel(FLOW_CONTROL_ERROR, cause);
-                }
+
+                // does not check overflow anymore: Let receiver continue 
receiving the pending bytes.
             }
             return writtenBytes;
         }
@@ -670,14 +668,6 @@ public class TriHttp2RemoteFlowController implements 
Http2RemoteFlowController {
         final boolean isWritableConnection() {
             return connectionState.windowSize() - totalPendingBytes > 0 && 
isChannelWritable();
         }
-
-        final boolean isOverFlowControl() {
-            if (connectionState.windowSize() == 0) {
-                return true;
-            } else {
-                return false;
-            }
-        }
     }
 
     /**
@@ -777,13 +767,8 @@ public class TriHttp2RemoteFlowController implements 
Http2RemoteFlowController {
                 checkAllWritabilityChanged();
             } else if (isWritable(state) != state.markedWritability()) {
                 notifyWritabilityChanged(state);
-            } else if (isOverFlowControl()) {
-                throw streamError(
-                        state.stream().id(),
-                        FLOW_CONTROL_ERROR,
-                        "TotalPendingBytes size overflow for stream: %d",
-                        state.stream().id());
             }
+            // does not check overflow anymore: Let receiver continue 
receiving the pending bytes.
         }
 
         private void checkAllWritabilityChanged() throws Http2Exception {
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3ClientFrameCodec.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3ClientFrameCodec.java
index da4e9cd9e9..82e41bd333 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3ClientFrameCodec.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3ClientFrameCodec.java
@@ -26,6 +26,8 @@ import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
 
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandler.Sharable;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPipeline;
@@ -99,14 +101,36 @@ public class Http3ClientFrameCodec extends 
ChannelDuplexHandler {
     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise 
promise) throws Exception {
         if (msg instanceof Http2HeadersFrame) {
             Http2HeadersFrame frame = (Http2HeadersFrame) msg;
-            ctx.write(new DefaultHttp3HeadersFrame(new 
Http3HeadersAdapter(frame.headers())), promise);
             if (frame.isEndStream()) {
-                ((QuicStreamChannel) ctx.channel()).shutdownOutput(promise);
+                ChannelFuture future = ctx.write(
+                        new DefaultHttp3HeadersFrame(new 
Http3HeadersAdapter(frame.headers())), ctx.newPromise());
+                if (future.isDone()) {
+                    ((QuicStreamChannel) 
ctx.channel()).shutdownOutput(promise);
+                } else {
+                    future.addListener(
+                            (ChannelFutureListener) f -> ((QuicStreamChannel) 
ctx.channel()).shutdownOutput(promise));
+                }
+                return;
             }
+            ctx.write(new DefaultHttp3HeadersFrame(new 
Http3HeadersAdapter(frame.headers())), promise);
         } else if (msg instanceof Http2DataFrame) {
             Http2DataFrame frame = (Http2DataFrame) msg;
             if (frame.isEndStream()) {
-                ((QuicStreamChannel) ctx.channel()).shutdownOutput(promise);
+                if (Unpooled.EMPTY_BUFFER.equals(frame.content())) {
+                    ((QuicStreamChannel) 
ctx.channel()).shutdownOutput(promise);
+                    return;
+                }
+                ChannelFuture future = ctx.write(new 
DefaultHttp3DataFrame(frame.content()), ctx.newPromise());
+                if (future.isDone()) {
+                    ((QuicStreamChannel) 
ctx.channel()).shutdownOutput(promise);
+                } else {
+                    future.addListener(
+                            (ChannelFutureListener) f -> ((QuicStreamChannel) 
ctx.channel()).shutdownOutput(promise));
+                }
+                return;
+            }
+            if (Unpooled.EMPTY_BUFFER.equals(frame.content())) {
+                promise.trySuccess();
                 return;
             }
             ctx.write(new DefaultHttp3DataFrame(frame.content()), promise);
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
index 39b0df645a..89927fed3e 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
@@ -409,6 +409,9 @@ public abstract class AbstractTripleClientStream extends 
AbstractStream implemen
 
         @Override
         public void onHeader(Http2Headers headers, boolean endStream) {
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("endStream: {} HEADERS: {}", endStream, headers);
+            }
             executor.execute(() -> {
                 if (endStream) {
                     if (!halfClosed) {
@@ -428,6 +431,9 @@ public abstract class AbstractTripleClientStream extends 
AbstractStream implemen
 
         @Override
         public void onData(ByteBuf data, boolean endStream) {
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("endStream: {} DATA: {}", endStream, 
data.toString(StandardCharsets.UTF_8));
+            }
             try {
                 executor.execute(() -> doOnData(data, endStream));
             } catch (Throwable t) {
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/GracefulShutdown.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/GracefulShutdown.java
index 29c0b6f44f..5e86b592c0 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/GracefulShutdown.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/GracefulShutdown.java
@@ -19,6 +19,8 @@ package org.apache.dubbo.rpc.protocol.tri.transport;
 import java.util.concurrent.TimeUnit;
 
 import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
 import io.netty.handler.codec.http2.DefaultHttp2GoAwayFrame;
@@ -66,9 +68,12 @@ public class GracefulShutdown {
         try {
             Http2GoAwayFrame goAwayFrame = new DefaultHttp2GoAwayFrame(
                     Http2Error.NO_ERROR, 
ByteBufUtil.writeAscii(this.ctx.alloc(), this.goAwayMessage));
-            ctx.writeAndFlush(goAwayFrame);
-            // TODO support customize graceful shutdown timeout mills
-            ctx.close(originPromise);
+            ChannelFuture future = ctx.writeAndFlush(goAwayFrame, 
ctx.newPromise());
+            if (future.isDone()) {
+                ctx.close(originPromise);
+            } else {
+                future.addListener((ChannelFutureListener) f -> 
ctx.close(originPromise));
+            }
         } catch (Exception e) {
             ctx.fireExceptionCaught(e);
         }

Reply via email to