This is an automated email from the ASF dual-hosted git repository.

earthchen pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 837b9c5515 remove duplicate flowcontroller listener (#11642)
837b9c5515 is described below

commit 837b9c5515936ef16710743b54783f53c0581e4f
Author: 一个不知名的Java靓仔 <[email protected]>
AuthorDate: Sat Feb 25 18:26:47 2023 +0800

    remove duplicate flowcontroller listener (#11642)
    
    * remove duplicate flowcontroller listener
    
    * remove duplicate flowcontroller listener
---
 .../protocol/tri/TripleHttp2FrameCodecBuilder.java | 70 ++++++++++++++++++++++
 .../rpc/protocol/tri/TripleHttp2Protocol.java      |  9 ++-
 2 files changed, 74 insertions(+), 5 deletions(-)

diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameCodecBuilder.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameCodecBuilder.java
new file mode 100644
index 0000000000..217b8c1496
--- /dev/null
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameCodecBuilder.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+
+import io.netty.handler.codec.http2.DefaultHttp2Connection;
+import io.netty.handler.codec.http2.Http2CodecUtil;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
+import io.netty.handler.codec.http2.Http2LocalFlowController;
+import io.netty.handler.codec.http2.Http2RemoteFlowController;
+import org.apache.dubbo.common.utils.Assert;
+
+import java.util.function.Consumer;
+
+public class TripleHttp2FrameCodecBuilder extends Http2FrameCodecBuilder {
+
+    TripleHttp2FrameCodecBuilder(Http2Connection connection) {
+        connection(connection);
+    }
+
+    public static TripleHttp2FrameCodecBuilder fromConnection(Http2Connection 
connection) {
+        return new TripleHttp2FrameCodecBuilder(connection);
+    }
+
+    public static TripleHttp2FrameCodecBuilder forClient() {
+        return forClient(Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS);
+    }
+
+    public static TripleHttp2FrameCodecBuilder forClient(int 
maxReservedStreams) {
+        return fromConnection(new DefaultHttp2Connection(false, 
maxReservedStreams));
+    }
+
+    public static TripleHttp2FrameCodecBuilder forServer() {
+        return forServer(Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS);
+    }
+
+    public static TripleHttp2FrameCodecBuilder forServer(int 
maxReservedStreams) {
+        return fromConnection(new DefaultHttp2Connection(true, 
maxReservedStreams));
+    }
+
+    public TripleHttp2FrameCodecBuilder 
customizeConnection(Consumer<Http2Connection> connectionCustomizer) {
+        Http2Connection connection = this.connection();
+        Assert.notNull(connection, "connection cannot be null.");
+        connectionCustomizer.accept(connection);
+        return this;
+    }
+
+    public TripleHttp2FrameCodecBuilder 
remoteFlowController(Http2RemoteFlowController remoteFlowController) {
+        return this.customizeConnection((connection) -> 
connection.remote().flowController(remoteFlowController));
+    }
+
+    public TripleHttp2FrameCodecBuilder 
localFlowController(Http2LocalFlowController localFlowController) {
+        return this.customizeConnection((connection) -> 
connection.local().flowController(localFlowController));
+    }
+}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index d13e4bfcd2..321af9d614 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -43,7 +43,6 @@ import 
org.apache.dubbo.rpc.protocol.tri.transport.TripleWriteQueue;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
 import io.netty.handler.codec.http2.Http2FrameCodec;
-import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
 import io.netty.handler.codec.http2.Http2FrameLogger;
 import io.netty.handler.codec.http2.Http2MultiplexHandler;
 import io.netty.handler.codec.http2.Http2Settings;
@@ -113,7 +112,8 @@ public class TripleHttp2Protocol extends 
AbstractWireProtocol implements ScopeMo
         } else {
             headFilters = Collections.emptyList();
         }
-        final Http2FrameCodec codec = Http2FrameCodecBuilder.forServer()
+        final Http2FrameCodec codec = TripleHttp2FrameCodecBuilder.forServer()
+            .customizeConnection((connection) -> 
connection.remote().flowController(new TriHttp2RemoteFlowController(connection, 
url.getOrDefaultApplicationModel())))
             .gracefulShutdownTimeoutMillis(10000)
             .initialSettings(new Http2Settings().headerTableSize(
                     config.getInt(H2_SETTINGS_HEADER_TABLE_SIZE_KEY, 
DEFAULT_SETTING_HEADER_LIST_SIZE))
@@ -127,7 +127,6 @@ public class TripleHttp2Protocol extends 
AbstractWireProtocol implements ScopeMo
             .frameLogger(SERVER_LOGGER)
             .build();
         ExecutorSupport executorSupport = 
ExecutorRepository.getInstance(url.getOrDefaultApplicationModel()).getExecutorSupport(url);
-        codec.connection().remote().flowController(new 
TriHttp2RemoteFlowController(codec.connection(), 
url.getOrDefaultApplicationModel()));
         
codec.connection().local().flowController().frameWriter(codec.encoder().frameWriter());
         TripleWriteQueue writeQueue = new TripleWriteQueue();
         final Http2MultiplexHandler handler = new Http2MultiplexHandler(
@@ -152,7 +151,8 @@ public class TripleHttp2Protocol extends 
AbstractWireProtocol implements ScopeMo
 
     @Override
     public void configClientPipeline(URL url, ChannelOperator operator, 
ContextOperator contextOperator) {
-        final Http2FrameCodec codec = Http2FrameCodecBuilder.forClient()
+        final Http2FrameCodec codec = TripleHttp2FrameCodecBuilder.forClient()
+            .customizeConnection((connection) -> 
connection.remote().flowController(new TriHttp2RemoteFlowController(connection, 
url.getOrDefaultApplicationModel())))
             .gracefulShutdownTimeoutMillis(10000)
             .initialSettings(new Http2Settings().headerTableSize(
                     config.getInt(H2_SETTINGS_HEADER_TABLE_SIZE_KEY, 
DEFAULT_SETTING_HEADER_LIST_SIZE))
@@ -166,7 +166,6 @@ public class TripleHttp2Protocol extends 
AbstractWireProtocol implements ScopeMo
                     DEFAULT_MAX_HEADER_LIST_SIZE)))
             .frameLogger(CLIENT_LOGGER)
             .build();
-        codec.connection().remote().flowController(new 
TriHttp2RemoteFlowController(codec.connection(), 
url.getOrDefaultApplicationModel()));
         
codec.connection().local().flowController().frameWriter(codec.encoder().frameWriter());
         final Http2MultiplexHandler handler = new Http2MultiplexHandler(
             new TripleClientHandler(frameworkModel));

Reply via email to