This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 4c0e06749bb664f27a9726b9a37827453f96fba8 Author: kaiyi.lk <[email protected]> AuthorDate: Wed Nov 9 16:44:03 2022 +0800 [ISSUE #5486] polish MultiProtocolRemotingServer --- .../rocketmq/proxy/common/ReflectionCache.java | 45 ------------ .../remoting/MultiProtocolRemotingServer.java | 81 ++++------------------ .../http2proxy/Http2ProtocolProxyHandler.java | 2 +- .../protocol/remoting/RemotingProtocolHandler.java | 17 +++-- .../remoting/netty/NettyRemotingServer.java | 68 +++++++++++++----- 5 files changed, 78 insertions(+), 135 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReflectionCache.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReflectionCache.java deleted file mode 100644 index 31fa46c90..000000000 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReflectionCache.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.proxy.common; - -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import java.lang.reflect.Field; -import java.util.concurrent.TimeUnit; - -public class ReflectionCache { - private final Cache<Class<?>, Field> fieldCache; - private static final int DEFAULT_MAX_SIZE = 15; - - public ReflectionCache() { - this(DEFAULT_MAX_SIZE); - } - - public ReflectionCache(int maxSize) { - this.fieldCache = CacheBuilder.newBuilder().maximumSize(maxSize).expireAfterAccess(5, TimeUnit.MINUTES).build(); - } - - public Field getDeclaredField(final Class<?> clazz, final String fieldName) throws Exception { - return this.fieldCache.get(clazz, () -> { - Field field = clazz.getDeclaredField(fieldName); - field.setAccessible(true); - return field; - }); - } -} - diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java index 73aeeaf42..02e3a545e 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java @@ -17,18 +17,11 @@ package org.apache.rocketmq.proxy.remoting; -import com.google.common.base.Preconditions; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.ChannelDuplexHandler; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.timeout.IdleStateHandler; -import io.netty.util.concurrent.DefaultEventExecutorGroup; import java.io.IOException; -import java.lang.reflect.Field; import java.security.cert.CertificateException; -import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.proxy.common.ProxyException; import org.apache.rocketmq.proxy.common.ProxyExceptionCode; @@ -37,7 +30,6 @@ import org.apache.rocketmq.proxy.remoting.protocol.http2proxy.Http2ProtocolProxy import org.apache.rocketmq.proxy.remoting.protocol.remoting.RemotingProtocolHandler; import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.common.TlsMode; -import org.apache.rocketmq.remoting.netty.NettyEncoder; import org.apache.rocketmq.remoting.netty.NettyRemotingServer; import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.remoting.netty.TlsSystemConfig; @@ -45,26 +37,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * for remoting server, if config listen port is 8080 in nettyServerConfig - * <p> - * will - * <li>listen port at 9080 with protocol remoting</li> - * <li>listen port at 8080 with protocol remoting and http2</li> + * support remoting and http2 protocol at one port */ public class MultiProtocolRemotingServer extends NettyRemotingServer { private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); - private static final int PORT_DELTA = 1000; private final NettyServerConfig nettyServerConfig; - private final int port; public MultiProtocolRemotingServer(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener) { super(nettyServerConfig, channelEventListener); - this.port = nettyServerConfig.getListenPort(); - // to support multiple protocol - // will bind the real port in configChildHandler - // so let parent bind to a useless port - nettyServerConfig.setListenPort(nettyServerConfig.getListenPort() + PORT_DELTA); this.nettyServerConfig = nettyServerConfig; } @@ -84,50 +65,18 @@ public class MultiProtocolRemotingServer extends NettyRemotingServer { } @Override - public void start() { - super.start(); - this.configChildHandler(); - } - - protected void configChildHandler() { - try { - ServerBootstrap serverBootstrap = getField("serverBootstrap", ServerBootstrap.class); - Preconditions.checkNotNull(serverBootstrap); - DefaultEventExecutorGroup defaultEventExecutorGroup = getField("defaultEventExecutorGroup", DefaultEventExecutorGroup.class); - Preconditions.checkNotNull(defaultEventExecutorGroup); - NettyEncoder encoder = getField("encoder", NettyEncoder.class); - Preconditions.checkNotNull(encoder); - ChannelDuplexHandler connectionManageHandler = getField("connectionManageHandler", ChannelDuplexHandler.class); - Preconditions.checkNotNull(connectionManageHandler); - SimpleChannelInboundHandler serverHandler = getField("serverHandler", SimpleChannelInboundHandler.class); - Preconditions.checkNotNull(serverHandler); - SimpleChannelInboundHandler handshakeHandler = getField("handshakeHandler", SimpleChannelInboundHandler.class); - Preconditions.checkNotNull(handshakeHandler); - ConcurrentMap remotingServerTable = getField("remotingServerTable", ConcurrentMap.class); - Preconditions.checkNotNull(remotingServerTable); - - serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { - @Override - protected void initChannel(SocketChannel ch) { - ch.pipeline() - .addLast(defaultEventExecutorGroup, "handshakeHandler", handshakeHandler) - .addLast(defaultEventExecutorGroup, - new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), - new ProtocolNegotiationHandler(new RemotingProtocolHandler(encoder, connectionManageHandler, serverHandler)) - .addProtocolHandler(new Http2ProtocolProxyHandler()) - ); - } - }); - remotingServerTable.put(port, this); - serverBootstrap.bind(port).sync(); - } catch (Throwable t) { - throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "config netty child handler failed", t); - } - } - - protected <T> T getField(String name, Class<T> getClazz) throws Throwable { - Field field = NettyRemotingServer.class.getDeclaredField(name); - field.setAccessible(true); - return getClazz.cast(field.get(this)); + protected ChannelPipeline configChannel(SocketChannel ch) { + return ch.pipeline() + .addLast(this.getDefaultEventExecutorGroup(), HANDSHAKE_HANDLER_NAME, this.getHandshakeHandler()) + .addLast(this.getDefaultEventExecutorGroup(), + new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), + new ProtocolNegotiationHandler( + new RemotingProtocolHandler( + this.getEncoder(), + this.getDistributionHandler(), + this.getConnectionManageHandler(), + this.getServerHandler())) + .addProtocolHandler(new Http2ProtocolProxyHandler()) + ); } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java index c5050cda7..86f1ee921 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java @@ -43,7 +43,7 @@ public class Http2ProtocolProxyHandler implements ProtocolHandler { private static final String LOCAL_HOST = "127.0.0.1"; /** * The int value of "PRI ". Now use 4 bytes to judge protocol, may be has potential risks if there is a new protocol - * which start with "PRI " too in the future + * which start with "PRI " in the future * <p> * The full HTTP/2 connection preface is "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" * <p> diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/remoting/RemotingProtocolHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/remoting/RemotingProtocolHandler.java index 3e4cc7c04..2d1a04d0e 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/remoting/RemotingProtocolHandler.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/remoting/RemotingProtocolHandler.java @@ -18,22 +18,26 @@ package org.apache.rocketmq.proxy.remoting.protocol.remoting; import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; import org.apache.rocketmq.proxy.remoting.protocol.ProtocolHandler; import org.apache.rocketmq.remoting.netty.NettyDecoder; import org.apache.rocketmq.remoting.netty.NettyEncoder; +import org.apache.rocketmq.remoting.netty.NettyRemotingServer; +import org.apache.rocketmq.remoting.netty.RemotingCodeDistributionHandler; public class RemotingProtocolHandler implements ProtocolHandler { private final NettyEncoder encoder; - private final ChannelDuplexHandler connectionManageHandler; - private final SimpleChannelInboundHandler serverHandler; + private final RemotingCodeDistributionHandler remotingCodeDistributionHandler; + private final NettyRemotingServer.NettyConnectManageHandler connectionManageHandler; + private final NettyRemotingServer.NettyServerHandler serverHandler; - public RemotingProtocolHandler(NettyEncoder encoder, ChannelDuplexHandler connectionManageHandler, - SimpleChannelInboundHandler serverHandler) { + public RemotingProtocolHandler(NettyEncoder encoder, + RemotingCodeDistributionHandler remotingCodeDistributionHandler, + NettyRemotingServer.NettyConnectManageHandler connectionManageHandler, + NettyRemotingServer.NettyServerHandler serverHandler) { this.encoder = encoder; + this.remotingCodeDistributionHandler = remotingCodeDistributionHandler; this.connectionManageHandler = connectionManageHandler; this.serverHandler = serverHandler; } @@ -48,6 +52,7 @@ public class RemotingProtocolHandler implements ProtocolHandler { ctx.pipeline().addLast( this.encoder, new NettyDecoder(), + this.remotingCodeDistributionHandler, this.connectionManageHandler, this.serverHandler ); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java index 1b364b6ee..646c0734e 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java @@ -26,6 +26,7 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.WriteBufferWaterMark; @@ -93,9 +94,9 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti */ private final ConcurrentMap<Integer/*Port*/, NettyRemotingAbstract> remotingServerTable = new ConcurrentHashMap<>(); - private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler"; - private static final String TLS_HANDLER_NAME = "sslHandler"; - private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder"; + public static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler"; + public static final String TLS_HANDLER_NAME = "sslHandler"; + public static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder"; // sharable handlers private HandshakeHandler handshakeHandler; @@ -242,17 +243,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { - ch.pipeline() - .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler) - .addLast(defaultEventExecutorGroup, - encoder, - new NettyDecoder(), - distributionHandler, - new IdleStateHandler(0, 0, - nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), - connectionManageHandler, - serverHandler - ); + configChannel(ch); } }); @@ -297,6 +288,25 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti }, 1, 1, TimeUnit.SECONDS); } + /** + * config channel in ChannelInitializer + * @param ch the SocketChannel needed to init + * @return the initialized ChannelPipeline, sub class can use it to extent in the future + */ + protected ChannelPipeline configChannel(SocketChannel ch) { + return ch.pipeline() + .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler) + .addLast(defaultEventExecutorGroup, + encoder, + new NettyDecoder(), + distributionHandler, + new IdleStateHandler(0, 0, + nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), + connectionManageHandler, + serverHandler + ); + } + private void addCustomConfig(ServerBootstrap childHandler) { if (nettyServerConfig.getServerSocketSndBufSize() > 0) { log.info("server set SO_SNDBUF to {}", nettyServerConfig.getServerSocketSndBufSize()); @@ -438,8 +448,32 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } } + public DefaultEventExecutorGroup getDefaultEventExecutorGroup() { + return defaultEventExecutorGroup; + } + + public HandshakeHandler getHandshakeHandler() { + return handshakeHandler; + } + + public NettyEncoder getEncoder() { + return encoder; + } + + public NettyConnectManageHandler getConnectionManageHandler() { + return connectionManageHandler; + } + + public NettyServerHandler getServerHandler() { + return serverHandler; + } + + public RemotingCodeDistributionHandler getDistributionHandler() { + return distributionHandler; + } + @ChannelHandler.Sharable - class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf> { + public class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf> { private final TlsMode tlsMode; @@ -496,7 +530,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } @ChannelHandler.Sharable - class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> { + public class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> { @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) { @@ -529,7 +563,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } @ChannelHandler.Sharable - class NettyConnectManageHandler extends ChannelDuplexHandler { + public class NettyConnectManageHandler extends ChannelDuplexHandler { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
