This is an automated email from the ASF dual-hosted git repository.

crazyhzm pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 734c9c3f4a feat: decoupling WireProtocol by netty (#10976)
734c9c3f4a is described below

commit 734c9c3f4a08c68c1ef5fce9c7c879423d1b8780
Author: conghuhu <[email protected]>
AuthorDate: Tue Nov 22 11:27:08 2022 +0800

    feat: decoupling WireProtocol by netty (#10976)
---
 .../src/main/java/org/apache/dubbo/qos/pu/QosWireProtocol.java   | 9 ---------
 .../java/org/apache/dubbo/remoting/api/AbstractWireProtocol.java | 4 ++--
 .../main/java/org/apache/dubbo/remoting/api/WireProtocol.java    | 3 +--
 .../test/java/org/apache/dubbo/remoting/api/EmptyProtocol.java   | 3 +--
 .../dubbo/remoting/transport/netty4/NettyConnectionClient.java   | 5 ++++-
 .../dubbo/remoting/transport/netty4/api/EmptyWireProtocol.java   | 2 +-
 .../org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java   | 8 ++++++--
 .../org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java     | 8 --------
 8 files changed, 15 insertions(+), 27 deletions(-)

diff --git 
a/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/QosWireProtocol.java
 
b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/QosWireProtocol.java
index 71e6633eb3..bd4bc2db1e 100644
--- 
a/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/QosWireProtocol.java
+++ 
b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/QosWireProtocol.java
@@ -27,9 +27,6 @@ import org.apache.dubbo.remoting.api.pu.ChannelOperator;
 import org.apache.dubbo.rpc.model.FrameworkModel;
 import org.apache.dubbo.rpc.model.ScopeModelAware;
 
-import io.netty.channel.ChannelPipeline;
-import io.netty.handler.ssl.SslContext;
-
 import java.util.ArrayList;
 import java.util.List;
 
@@ -54,10 +51,4 @@ public class QosWireProtocol extends AbstractWireProtocol 
implements ScopeModelA
         operator.configChannelHandler(handlers);
     }
 
-
-    @Override
-    public void configClientPipeline(URL url, ChannelPipeline pipeline, 
SslContext sslContext) {
-
-    }
-
 }
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/AbstractWireProtocol.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/AbstractWireProtocol.java
index 278712bcac..621b5e45dd 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/AbstractWireProtocol.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/AbstractWireProtocol.java
@@ -17,8 +17,8 @@
 package org.apache.dubbo.remoting.api;
 
 import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.api.pu.ChannelOperator;
 
-import io.netty.channel.ChannelPipeline;
 import io.netty.handler.ssl.SslContext;
 
 public abstract class AbstractWireProtocol implements WireProtocol {
@@ -35,7 +35,7 @@ public abstract class AbstractWireProtocol implements 
WireProtocol {
     }
 
     @Override
-    public void configClientPipeline(URL url, ChannelPipeline pipeline, 
SslContext sslContext) {
+    public void configClientPipeline(URL url, ChannelOperator operator, 
SslContext sslContext) {
 
     }
 
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/WireProtocol.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/WireProtocol.java
index fe615f8210..f64b5fa6bd 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/WireProtocol.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/WireProtocol.java
@@ -21,7 +21,6 @@ import org.apache.dubbo.common.extension.ExtensionScope;
 import org.apache.dubbo.common.extension.SPI;
 import org.apache.dubbo.remoting.api.pu.ChannelOperator;
 
-import io.netty.channel.ChannelPipeline;
 import io.netty.handler.ssl.SslContext;
 
 @SPI(scope = ExtensionScope.FRAMEWORK)
@@ -31,7 +30,7 @@ public interface WireProtocol {
 
     void configServerProtocolHandler(URL url, ChannelOperator operator);
 
-    void configClientPipeline(URL url, ChannelPipeline pipeline, SslContext 
sslContext);
+    void configClientPipeline(URL url, ChannelOperator operator, SslContext 
sslContext);
 
     void close();
 }
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/EmptyProtocol.java
 
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/EmptyProtocol.java
index 5c69d9b945..bfb87e77c1 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/EmptyProtocol.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/EmptyProtocol.java
@@ -19,7 +19,6 @@ package org.apache.dubbo.remoting.api;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.remoting.api.pu.ChannelOperator;
 
-import io.netty.channel.ChannelPipeline;
 import io.netty.handler.ssl.SslContext;
 
 public class EmptyProtocol implements WireProtocol {
@@ -34,7 +33,7 @@ public class EmptyProtocol implements WireProtocol {
     }
 
     @Override
-    public void configClientPipeline(URL url, ChannelPipeline pipeline, 
SslContext sslContext) {
+    public void configClientPipeline(URL url, ChannelOperator operator, 
SslContext sslContext) {
 
     }
 
diff --git 
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
 
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
index 299d8abc9d..7911b480fd 100644
--- 
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
+++ 
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
@@ -110,6 +110,7 @@ public class NettyConnectionClient extends 
AbstractConnectionClient {
         nettyBootstrap.handler(new ChannelInitializer<SocketChannel>() {
             @Override
             protected void initChannel(SocketChannel ch) {
+                NettyChannel nettyChannel = NettyChannel.getOrAddChannel(ch, 
getUrl(), getChannelHandler());
                 final ChannelPipeline pipeline = ch.pipeline();
                 SslContext sslContext = null;
                 if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
@@ -120,7 +121,9 @@ public class NettyConnectionClient extends 
AbstractConnectionClient {
                 // TODO support IDLE
 //                int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
                 pipeline.addLast("connectionHandler", connectionHandler);
-                protocol.configClientPipeline(getUrl(), pipeline, sslContext);
+
+                NettyConfigOperator operator = new 
NettyConfigOperator(nettyChannel, getChannelHandler());
+                protocol.configClientPipeline(getUrl(), operator, sslContext);
                 // TODO support Socks5
             }
         });
diff --git 
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/api/EmptyWireProtocol.java
 
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/api/EmptyWireProtocol.java
index 5956175742..7bd7104b77 100644
--- 
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/api/EmptyWireProtocol.java
+++ 
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/api/EmptyWireProtocol.java
@@ -36,7 +36,7 @@ public class EmptyWireProtocol implements WireProtocol {
     }
 
     @Override
-    public void configClientPipeline(URL url, ChannelPipeline pipeline, 
SslContext sslContext) {
+    public void configClientPipeline(URL url, ChannelOperator operator, 
SslContext sslContext) {
 
     }
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index 170ad30614..4516f27a24 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -158,7 +158,7 @@ public class TripleHttp2Protocol extends 
AbstractWireProtocol implements ScopeMo
     }
 
     @Override
-    public void configClientPipeline(URL url, ChannelPipeline pipeline, 
SslContext sslContext) {
+    public void configClientPipeline(URL url, ChannelOperator operator, 
SslContext sslContext) {
         final Http2FrameCodec codec = Http2FrameCodecBuilder.forClient()
             .gracefulShutdownTimeoutMillis(10000)
             .initialSettings(new Http2Settings().headerTableSize(
@@ -175,6 +175,10 @@ public class TripleHttp2Protocol extends 
AbstractWireProtocol implements ScopeMo
             .build();
         final Http2MultiplexHandler handler = new Http2MultiplexHandler(
             new TripleClientHandler(frameworkModel));
-        pipeline.addLast(codec, handler, new TripleTailHandler());
+        List<ChannelHandler> handlers = new ArrayList<>();
+        handlers.add(new ChannelHandlerPretender(codec));
+        handlers.add(new ChannelHandlerPretender(handler));
+        handlers.add(new ChannelHandlerPretender(new TripleTailHandler()));
+        operator.configChannelHandler(handlers);
     }
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java
index 020cbfc522..a0a9ac16f3 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java
@@ -19,15 +19,9 @@ package org.apache.dubbo.rpc.protocol.tri;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
-import org.apache.dubbo.common.utils.NetUtils;
 import org.apache.dubbo.remoting.ChannelHandler;
-import org.apache.dubbo.remoting.RemotingException;
 import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
 import org.apache.dubbo.remoting.api.connection.ConnectionManager;
-import 
org.apache.dubbo.remoting.api.connection.SingleProtocolConnectionManager;
-import org.apache.dubbo.remoting.api.pu.DefaultPuHandler;
-import org.apache.dubbo.remoting.exchange.PortUnificationExchanger;
-import org.apache.dubbo.remoting.transport.netty4.NettyPortUnificationServer;
 import org.apache.dubbo.rpc.RpcInvocation;
 import org.apache.dubbo.rpc.model.MethodDescriptor;
 import org.apache.dubbo.rpc.model.ReflectionMethodDescriptor;
@@ -38,12 +32,10 @@ import org.apache.dubbo.rpc.protocol.tri.support.IGreeter;
 
 import io.netty.channel.Channel;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
 import java.util.HashSet;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 
 import static org.mockito.ArgumentMatchers.any;

Reply via email to