This is an automated email from the ASF dual-hosted git repository.
crazyhzm pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 734c9c3f4a feat: decoupling WireProtocol by netty (#10976)
734c9c3f4a is described below
commit 734c9c3f4a08c68c1ef5fce9c7c879423d1b8780
Author: conghuhu <[email protected]>
AuthorDate: Tue Nov 22 11:27:08 2022 +0800
feat: decoupling WireProtocol by netty (#10976)
---
.../src/main/java/org/apache/dubbo/qos/pu/QosWireProtocol.java | 9 ---------
.../java/org/apache/dubbo/remoting/api/AbstractWireProtocol.java | 4 ++--
.../main/java/org/apache/dubbo/remoting/api/WireProtocol.java | 3 +--
.../test/java/org/apache/dubbo/remoting/api/EmptyProtocol.java | 3 +--
.../dubbo/remoting/transport/netty4/NettyConnectionClient.java | 5 ++++-
.../dubbo/remoting/transport/netty4/api/EmptyWireProtocol.java | 2 +-
.../org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java | 8 ++++++--
.../org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java | 8 --------
8 files changed, 15 insertions(+), 27 deletions(-)
diff --git
a/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/QosWireProtocol.java
b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/QosWireProtocol.java
index 71e6633eb3..bd4bc2db1e 100644
---
a/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/QosWireProtocol.java
+++
b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/QosWireProtocol.java
@@ -27,9 +27,6 @@ import org.apache.dubbo.remoting.api.pu.ChannelOperator;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.model.ScopeModelAware;
-import io.netty.channel.ChannelPipeline;
-import io.netty.handler.ssl.SslContext;
-
import java.util.ArrayList;
import java.util.List;
@@ -54,10 +51,4 @@ public class QosWireProtocol extends AbstractWireProtocol
implements ScopeModelA
operator.configChannelHandler(handlers);
}
-
- @Override
- public void configClientPipeline(URL url, ChannelPipeline pipeline,
SslContext sslContext) {
-
- }
-
}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/AbstractWireProtocol.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/AbstractWireProtocol.java
index 278712bcac..621b5e45dd 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/AbstractWireProtocol.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/AbstractWireProtocol.java
@@ -17,8 +17,8 @@
package org.apache.dubbo.remoting.api;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.api.pu.ChannelOperator;
-import io.netty.channel.ChannelPipeline;
import io.netty.handler.ssl.SslContext;
public abstract class AbstractWireProtocol implements WireProtocol {
@@ -35,7 +35,7 @@ public abstract class AbstractWireProtocol implements
WireProtocol {
}
@Override
- public void configClientPipeline(URL url, ChannelPipeline pipeline,
SslContext sslContext) {
+ public void configClientPipeline(URL url, ChannelOperator operator,
SslContext sslContext) {
}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/WireProtocol.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/WireProtocol.java
index fe615f8210..f64b5fa6bd 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/WireProtocol.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/WireProtocol.java
@@ -21,7 +21,6 @@ import org.apache.dubbo.common.extension.ExtensionScope;
import org.apache.dubbo.common.extension.SPI;
import org.apache.dubbo.remoting.api.pu.ChannelOperator;
-import io.netty.channel.ChannelPipeline;
import io.netty.handler.ssl.SslContext;
@SPI(scope = ExtensionScope.FRAMEWORK)
@@ -31,7 +30,7 @@ public interface WireProtocol {
void configServerProtocolHandler(URL url, ChannelOperator operator);
- void configClientPipeline(URL url, ChannelPipeline pipeline, SslContext
sslContext);
+ void configClientPipeline(URL url, ChannelOperator operator, SslContext
sslContext);
void close();
}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/EmptyProtocol.java
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/EmptyProtocol.java
index 5c69d9b945..bfb87e77c1 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/EmptyProtocol.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/EmptyProtocol.java
@@ -19,7 +19,6 @@ package org.apache.dubbo.remoting.api;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.remoting.api.pu.ChannelOperator;
-import io.netty.channel.ChannelPipeline;
import io.netty.handler.ssl.SslContext;
public class EmptyProtocol implements WireProtocol {
@@ -34,7 +33,7 @@ public class EmptyProtocol implements WireProtocol {
}
@Override
- public void configClientPipeline(URL url, ChannelPipeline pipeline,
SslContext sslContext) {
+ public void configClientPipeline(URL url, ChannelOperator operator,
SslContext sslContext) {
}
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
index 299d8abc9d..7911b480fd 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
@@ -110,6 +110,7 @@ public class NettyConnectionClient extends
AbstractConnectionClient {
nettyBootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
+ NettyChannel nettyChannel = NettyChannel.getOrAddChannel(ch,
getUrl(), getChannelHandler());
final ChannelPipeline pipeline = ch.pipeline();
SslContext sslContext = null;
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
@@ -120,7 +121,9 @@ public class NettyConnectionClient extends
AbstractConnectionClient {
// TODO support IDLE
// int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
pipeline.addLast("connectionHandler", connectionHandler);
- protocol.configClientPipeline(getUrl(), pipeline, sslContext);
+
+ NettyConfigOperator operator = new
NettyConfigOperator(nettyChannel, getChannelHandler());
+ protocol.configClientPipeline(getUrl(), operator, sslContext);
// TODO support Socks5
}
});
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/api/EmptyWireProtocol.java
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/api/EmptyWireProtocol.java
index 5956175742..7bd7104b77 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/api/EmptyWireProtocol.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/api/EmptyWireProtocol.java
@@ -36,7 +36,7 @@ public class EmptyWireProtocol implements WireProtocol {
}
@Override
- public void configClientPipeline(URL url, ChannelPipeline pipeline,
SslContext sslContext) {
+ public void configClientPipeline(URL url, ChannelOperator operator,
SslContext sslContext) {
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index 170ad30614..4516f27a24 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -158,7 +158,7 @@ public class TripleHttp2Protocol extends
AbstractWireProtocol implements ScopeMo
}
@Override
- public void configClientPipeline(URL url, ChannelPipeline pipeline,
SslContext sslContext) {
+ public void configClientPipeline(URL url, ChannelOperator operator,
SslContext sslContext) {
final Http2FrameCodec codec = Http2FrameCodecBuilder.forClient()
.gracefulShutdownTimeoutMillis(10000)
.initialSettings(new Http2Settings().headerTableSize(
@@ -175,6 +175,10 @@ public class TripleHttp2Protocol extends
AbstractWireProtocol implements ScopeMo
.build();
final Http2MultiplexHandler handler = new Http2MultiplexHandler(
new TripleClientHandler(frameworkModel));
- pipeline.addLast(codec, handler, new TripleTailHandler());
+ List<ChannelHandler> handlers = new ArrayList<>();
+ handlers.add(new ChannelHandlerPretender(codec));
+ handlers.add(new ChannelHandlerPretender(handler));
+ handlers.add(new ChannelHandlerPretender(new TripleTailHandler()));
+ operator.configChannelHandler(handlers);
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java
index 020cbfc522..a0a9ac16f3 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java
@@ -19,15 +19,9 @@ package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
-import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.ChannelHandler;
-import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
import org.apache.dubbo.remoting.api.connection.ConnectionManager;
-import
org.apache.dubbo.remoting.api.connection.SingleProtocolConnectionManager;
-import org.apache.dubbo.remoting.api.pu.DefaultPuHandler;
-import org.apache.dubbo.remoting.exchange.PortUnificationExchanger;
-import org.apache.dubbo.remoting.transport.netty4.NettyPortUnificationServer;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.model.MethodDescriptor;
import org.apache.dubbo.rpc.model.ReflectionMethodDescriptor;
@@ -38,12 +32,10 @@ import org.apache.dubbo.rpc.protocol.tri.support.IGreeter;
import io.netty.channel.Channel;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.util.HashSet;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import static org.mockito.ArgumentMatchers.any;