This is an automated email from the ASF dual-hosted git repository.
earthchen pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 837b9c5515 remove duplicate flowcontroller listener (#11642)
837b9c5515 is described below
commit 837b9c5515936ef16710743b54783f53c0581e4f
Author: 一个不知名的Java靓仔 <[email protected]>
AuthorDate: Sat Feb 25 18:26:47 2023 +0800
remove duplicate flowcontroller listener (#11642)
* remove duplicate flowcontroller listener
* remove duplicate flowcontroller listener
---
.../protocol/tri/TripleHttp2FrameCodecBuilder.java | 70 ++++++++++++++++++++++
.../rpc/protocol/tri/TripleHttp2Protocol.java | 9 ++-
2 files changed, 74 insertions(+), 5 deletions(-)
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameCodecBuilder.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameCodecBuilder.java
new file mode 100644
index 0000000000..217b8c1496
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameCodecBuilder.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+
+import io.netty.handler.codec.http2.DefaultHttp2Connection;
+import io.netty.handler.codec.http2.Http2CodecUtil;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
+import io.netty.handler.codec.http2.Http2LocalFlowController;
+import io.netty.handler.codec.http2.Http2RemoteFlowController;
+import org.apache.dubbo.common.utils.Assert;
+
+import java.util.function.Consumer;
+
+public class TripleHttp2FrameCodecBuilder extends Http2FrameCodecBuilder {
+
+ TripleHttp2FrameCodecBuilder(Http2Connection connection) {
+ connection(connection);
+ }
+
+ public static TripleHttp2FrameCodecBuilder fromConnection(Http2Connection
connection) {
+ return new TripleHttp2FrameCodecBuilder(connection);
+ }
+
+ public static TripleHttp2FrameCodecBuilder forClient() {
+ return forClient(Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS);
+ }
+
+ public static TripleHttp2FrameCodecBuilder forClient(int
maxReservedStreams) {
+ return fromConnection(new DefaultHttp2Connection(false,
maxReservedStreams));
+ }
+
+ public static TripleHttp2FrameCodecBuilder forServer() {
+ return forServer(Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS);
+ }
+
+ public static TripleHttp2FrameCodecBuilder forServer(int
maxReservedStreams) {
+ return fromConnection(new DefaultHttp2Connection(true,
maxReservedStreams));
+ }
+
+ public TripleHttp2FrameCodecBuilder
customizeConnection(Consumer<Http2Connection> connectionCustomizer) {
+ Http2Connection connection = this.connection();
+ Assert.notNull(connection, "connection cannot be null.");
+ connectionCustomizer.accept(connection);
+ return this;
+ }
+
+ public TripleHttp2FrameCodecBuilder
remoteFlowController(Http2RemoteFlowController remoteFlowController) {
+ return this.customizeConnection((connection) ->
connection.remote().flowController(remoteFlowController));
+ }
+
+ public TripleHttp2FrameCodecBuilder
localFlowController(Http2LocalFlowController localFlowController) {
+ return this.customizeConnection((connection) ->
connection.local().flowController(localFlowController));
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index d13e4bfcd2..321af9d614 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -43,7 +43,6 @@ import
org.apache.dubbo.rpc.protocol.tri.transport.TripleWriteQueue;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http2.Http2FrameCodec;
-import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2MultiplexHandler;
import io.netty.handler.codec.http2.Http2Settings;
@@ -113,7 +112,8 @@ public class TripleHttp2Protocol extends
AbstractWireProtocol implements ScopeMo
} else {
headFilters = Collections.emptyList();
}
- final Http2FrameCodec codec = Http2FrameCodecBuilder.forServer()
+ final Http2FrameCodec codec = TripleHttp2FrameCodecBuilder.forServer()
+ .customizeConnection((connection) ->
connection.remote().flowController(new TriHttp2RemoteFlowController(connection,
url.getOrDefaultApplicationModel())))
.gracefulShutdownTimeoutMillis(10000)
.initialSettings(new Http2Settings().headerTableSize(
config.getInt(H2_SETTINGS_HEADER_TABLE_SIZE_KEY,
DEFAULT_SETTING_HEADER_LIST_SIZE))
@@ -127,7 +127,6 @@ public class TripleHttp2Protocol extends
AbstractWireProtocol implements ScopeMo
.frameLogger(SERVER_LOGGER)
.build();
ExecutorSupport executorSupport =
ExecutorRepository.getInstance(url.getOrDefaultApplicationModel()).getExecutorSupport(url);
- codec.connection().remote().flowController(new
TriHttp2RemoteFlowController(codec.connection(),
url.getOrDefaultApplicationModel()));
codec.connection().local().flowController().frameWriter(codec.encoder().frameWriter());
TripleWriteQueue writeQueue = new TripleWriteQueue();
final Http2MultiplexHandler handler = new Http2MultiplexHandler(
@@ -152,7 +151,8 @@ public class TripleHttp2Protocol extends
AbstractWireProtocol implements ScopeMo
@Override
public void configClientPipeline(URL url, ChannelOperator operator,
ContextOperator contextOperator) {
- final Http2FrameCodec codec = Http2FrameCodecBuilder.forClient()
+ final Http2FrameCodec codec = TripleHttp2FrameCodecBuilder.forClient()
+ .customizeConnection((connection) ->
connection.remote().flowController(new TriHttp2RemoteFlowController(connection,
url.getOrDefaultApplicationModel())))
.gracefulShutdownTimeoutMillis(10000)
.initialSettings(new Http2Settings().headerTableSize(
config.getInt(H2_SETTINGS_HEADER_TABLE_SIZE_KEY,
DEFAULT_SETTING_HEADER_LIST_SIZE))
@@ -166,7 +166,6 @@ public class TripleHttp2Protocol extends
AbstractWireProtocol implements ScopeMo
DEFAULT_MAX_HEADER_LIST_SIZE)))
.frameLogger(CLIENT_LOGGER)
.build();
- codec.connection().remote().flowController(new
TriHttp2RemoteFlowController(codec.connection(),
url.getOrDefaultApplicationModel()));
codec.connection().local().flowController().frameWriter(codec.encoder().frameWriter());
final Http2MultiplexHandler handler = new Http2MultiplexHandler(
new TripleClientHandler(frameworkModel));