This is an automated email from the ASF dual-hosted git repository.

zrlw pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 2daf878fc5 opt triple remote flow controller (#15983)
2daf878fc5 is described below

commit 2daf878fc5f470db0e3699596c67d4bed996578c
Author: earthchen <[email protected]>
AuthorDate: Fri Jan 16 16:53:32 2026 +0800

    opt triple remote flow controller (#15983)
    
    * opt triple remote flow controller
    
    * opt triple remote flow controller
    
    ---------
    
    Co-authored-by: heliang <[email protected]>
---
 .../rpc/protocol/tri/TripleHttp2Protocol.java      |  7 ++-
 .../transport/TripleHttp2LocalFlowController.java  |  5 ++
 .../transport/TripleHttp2RemoteFlowController.java | 73 ++++++++++++++++++++++
 3 files changed, 83 insertions(+), 2 deletions(-)

diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index 918bb82024..ff599369db 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -43,6 +43,7 @@ import 
org.apache.dubbo.rpc.protocol.tri.h12.http1.DefaultHttp11ServerTransportL
 import 
org.apache.dubbo.rpc.protocol.tri.h12.http2.GenericHttp2ServerTransportListenerFactory;
 import org.apache.dubbo.rpc.protocol.tri.transport.TripleGoAwayHandler;
 import 
org.apache.dubbo.rpc.protocol.tri.transport.TripleHttp2LocalFlowController;
+import 
org.apache.dubbo.rpc.protocol.tri.transport.TripleHttp2RemoteFlowController;
 import 
org.apache.dubbo.rpc.protocol.tri.transport.TripleServerConnectionHandler;
 import org.apache.dubbo.rpc.protocol.tri.transport.TripleTailHandler;
 import 
org.apache.dubbo.rpc.protocol.tri.websocket.DefaultWebSocketServerTransportListenerFactory;
@@ -127,7 +128,8 @@ public class TripleHttp2Protocol extends 
AbstractWireProtocol implements ScopeMo
     private Http2Connection createHttp2ClientConnection(TripleConfig 
tripleConfig) {
         Http2Connection connection = new DefaultHttp2Connection(false);
         float windowUpdateRatio = tripleConfig.getWindowUpdateRatioOrDefault();
-        connection.local().flowController(new 
TripleHttp2LocalFlowController(connection, windowUpdateRatio));
+        
connection.local().flowController(TripleHttp2LocalFlowController.newController(connection,
 windowUpdateRatio));
+        
connection.remote().flowController(TripleHttp2RemoteFlowController.newController(connection));
         return connection;
     }
 
@@ -264,7 +266,8 @@ public class TripleHttp2Protocol extends 
AbstractWireProtocol implements ScopeMo
     private Http2Connection createHttp2ServerConnection(TripleConfig 
tripleConfig) {
         Http2Connection connection = new DefaultHttp2Connection(true);
         float windowUpdateRatio = tripleConfig.getWindowUpdateRatioOrDefault();
-        connection.local().flowController(new 
TripleHttp2LocalFlowController(connection, windowUpdateRatio));
+        
connection.local().flowController(TripleHttp2LocalFlowController.newController(connection,
 windowUpdateRatio));
+        
connection.remote().flowController(TripleHttp2RemoteFlowController.newController(connection));
         return connection;
     }
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2LocalFlowController.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2LocalFlowController.java
index aaea399401..1e54f9dbfc 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2LocalFlowController.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2LocalFlowController.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.rpc.protocol.tri.transport;
 
 import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
 import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2LocalFlowController;
 
 /**
  * Custom HTTP/2 local flow controller for Triple protocol with manual flow 
control.
@@ -62,4 +63,8 @@ public class TripleHttp2LocalFlowController extends 
DefaultHttp2LocalFlowControl
     public TripleHttp2LocalFlowController(Http2Connection connection, float 
windowUpdateRatio) {
         super(connection, windowUpdateRatio, true);
     }
+
+    public static Http2LocalFlowController newController(Http2Connection 
connection, float windowUpdateRatio) {
+        return new TripleHttp2LocalFlowController(connection, 
windowUpdateRatio);
+    }
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2RemoteFlowController.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2RemoteFlowController.java
new file mode 100644
index 0000000000..55c8ab6755
--- /dev/null
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2RemoteFlowController.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.tri.transport;
+
+import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2RemoteFlowController;
+import io.netty.handler.codec.http2.StreamByteDistributor;
+import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor;
+
+/**
+ * Triple-specific implementation of {@link Http2RemoteFlowController}.
+ *
+ * <p>This class extends the {@link DefaultHttp2RemoteFlowController} to 
provide
+ * flow control management for Triple (Dubbo's gRPC-compatible protocol) 
connections.
+ * It coordinates the distribution of outbound flow-controlled bytes across all
+ * active streams within a connection.
+ *
+ * <p>The controller utilizes a {@link WeightedFairQueueByteDistributor} to 
ensure
+ * that bandwidth is allocated based on stream weights and priorities while
+ * maintaining fairness to prevent stream starvation.
+ *
+ * @see DefaultHttp2RemoteFlowController
+ * @see WeightedFairQueueByteDistributor
+ */
+public class TripleHttp2RemoteFlowController extends 
DefaultHttp2RemoteFlowController {
+
+    /**
+     * Constructs a new TripleHttp2RemoteFlowController.
+     *
+     * @param connection the {@link Http2Connection} to be managed.
+     * @param streamByteDistributor the distributor responsible for 
determining how
+     * available bytes are allocated among streams.
+     */
+    public TripleHttp2RemoteFlowController(Http2Connection connection, 
StreamByteDistributor streamByteDistributor) {
+        super(connection, streamByteDistributor);
+    }
+
+    /**
+     * Factory method to create a pre-configured flow controller optimized for 
Triple performance.
+     *
+     * <p>Configuration details:
+     * <ul>
+     * <li>Uses {@link WeightedFairQueueByteDistributor} for weighted-fair 
resource allocation.</li>
+     * <li>Sets {@code allocationQuantum} to 16KB. This setting reduces 
scheduling overhead
+     * by ensuring each stream is allocated a meaningful chunk of data before 
switching
+     * contexts, which significantly improves throughput in high-load 
scenarios.</li>
+     * </ul>
+     *
+     * @param connection the {@link Http2Connection} for which the controller 
will be created.
+     * @return a fully initialized {@link Http2RemoteFlowController} instance.
+     */
+    public static Http2RemoteFlowController newController(Http2Connection 
connection) {
+        WeightedFairQueueByteDistributor dist = new 
WeightedFairQueueByteDistributor(connection);
+        // Optimization: 16KB quantum size to balance fairness and 
high-throughput performance.
+        dist.allocationQuantum(16 * 1024);
+        return new TripleHttp2RemoteFlowController(connection, dist);
+    }
+}

Reply via email to