This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 48673af7641418d3ae8cd02aa5c341e552036900 Author: zhouxiang <[email protected]> AuthorDate: Wed Nov 9 14:53:42 2022 +0800 [ISSUE #5486] Add remoting server --- .../org/apache/rocketmq/proxy/ProxyStartup.java | 4 + .../rocketmq/proxy/common/ReflectionCache.java | 45 +++ .../apache/rocketmq/proxy/config/ProxyConfig.java | 196 +++++++++++++ .../remoting/MultiProtocolRemotingServer.java | 133 +++++++++ .../proxy/remoting/MultiProtocolTlsHelper.java | 113 ++++++++ .../proxy/remoting/RemotingProtocolServer.java | 304 ++++++++++++++++++++- .../proxy/remoting/protocol/ProtocolHandler.java | 28 ++ .../protocol/ProtocolNegotiationHandler.java | 61 +++++ .../http2proxy/Http2ProtocolProxyHandler.java | 119 ++++++++ .../http2proxy/Http2ProxyBackendHandler.java | 67 +++++ .../http2proxy/Http2ProxyFrontendHandler.java | 78 ++++++ .../protocol/remoting/RemotingProtocolHandler.java | 55 ++++ 12 files changed, 1199 insertions(+), 4 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java index 78399cf35..42a833430 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java @@ -47,6 +47,7 @@ import org.apache.rocketmq.proxy.grpc.v2.GrpcMessagingApplication; import org.apache.rocketmq.proxy.metrics.ProxyMetricsManager; import org.apache.rocketmq.proxy.processor.DefaultMessagingProcessor; import org.apache.rocketmq.proxy.processor.MessagingProcessor; +import org.apache.rocketmq.proxy.remoting.RemotingProtocolServer; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.srvutil.ServerUtil; @@ -83,6 +84,9 @@ public class ProxyStartup { .build(); PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(grpcServer); + RemotingProtocolServer remotingServer = new RemotingProtocolServer(messagingProcessor); + PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(remotingServer); + // start servers one by one. PROXY_START_AND_SHUTDOWN.start(); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReflectionCache.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReflectionCache.java new file mode 100644 index 000000000..31fa46c90 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReflectionCache.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.common; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import java.lang.reflect.Field; +import java.util.concurrent.TimeUnit; + +public class ReflectionCache { + private final Cache<Class<?>, Field> fieldCache; + private static final int DEFAULT_MAX_SIZE = 15; + + public ReflectionCache() { + this(DEFAULT_MAX_SIZE); + } + + public ReflectionCache(int maxSize) { + this.fieldCache = CacheBuilder.newBuilder().maximumSize(maxSize).expireAfterAccess(5, TimeUnit.MINUTES).build(); + } + + public Field getDeclaredField(final Class<?> clazz, final String fieldName) throws Exception { + return this.fieldCache.get(clazz, () -> { + Field field = clazz.getDeclaredField(fieldName); + field.setAccessible(true); + return field; + }); + } +} + diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java index 0efca05b4..b613c191e 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java @@ -209,6 +209,33 @@ public class ProxyConfig implements ConfigFile { private long channelExpiredTimeout = 1000 * 120; + // remoting + + private boolean enableRemotingLocalProxyGrpc = true; + private int localProxyConnectTimeoutMs = 3000; + private int remotingListenPort = 8080; + + private int remotingHeartbeatThreadPoolNums = 2 * PROCESSOR_NUMBER; + private int remotingTopicRouteThreadPoolNums = 2 * PROCESSOR_NUMBER; + private int remotingSendMessageThreadPoolNums = 4 * PROCESSOR_NUMBER; + private int remotingPullMessageThreadPoolNums = 4 * PROCESSOR_NUMBER; + private int remotingUpdateOffsetThreadPoolNums = 4 * PROCESSOR_NUMBER; + private int remotingDefaultThreadPoolNums = 4 * PROCESSOR_NUMBER; + + private int remotingHeartbeatThreadPoolQueueCapacity = 50000; + private int remotingTopicRouteThreadPoolQueueCapacity = 50000; + private int remotingSendThreadPoolQueueCapacity = 10000; + private int remotingPullThreadPoolQueueCapacity = 50000; + private int remotingUpdateOffsetThreadPoolQueueCapacity = 10000; + private int remotingDefaultThreadPoolQueueCapacity = 50000; + + private long remotingWaitTimeMillsInSendQueue = 3 * 1000; + private long remotingWaitTimeMillsInPullQueue = 5 * 1000; + private long remotingWaitTimeMillsInHeartbeatQueue = 31 * 1000; + private long remotingWaitTimeMillsInUpdateOffsetQueue = 3 * 1000; + private long remotingWaitTimeMillsInTopicRouteQueue = 3 * 1000; + private long remotingWaitTimeMillsInDefaultQueue = 3 * 1000; + @Override public void initData() { parseDelayLevel(); @@ -1124,7 +1151,176 @@ public class ProxyConfig implements ConfigFile { return channelExpiredTimeout; } + public boolean isEnableRemotingLocalProxyGrpc() { + return enableRemotingLocalProxyGrpc; + } + public void setChannelExpiredTimeout(long channelExpiredTimeout) { this.channelExpiredTimeout = channelExpiredTimeout; } } + + public void setEnableRemotingLocalProxyGrpc(boolean enableRemotingLocalProxyGrpc) { + this.enableRemotingLocalProxyGrpc = enableRemotingLocalProxyGrpc; + } + + public int getLocalProxyConnectTimeoutMs() { + return localProxyConnectTimeoutMs; + } + + public void setLocalProxyConnectTimeoutMs(int localProxyConnectTimeoutMs) { + this.localProxyConnectTimeoutMs = localProxyConnectTimeoutMs; + } + + public int getRemotingListenPort() { + return remotingListenPort; + } + + public void setRemotingListenPort(int remotingListenPort) { + this.remotingListenPort = remotingListenPort; + } + + public int getRemotingHeartbeatThreadPoolNums() { + return remotingHeartbeatThreadPoolNums; + } + + public void setRemotingHeartbeatThreadPoolNums(int remotingHeartbeatThreadPoolNums) { + this.remotingHeartbeatThreadPoolNums = remotingHeartbeatThreadPoolNums; + } + + public int getRemotingTopicRouteThreadPoolNums() { + return remotingTopicRouteThreadPoolNums; + } + + public void setRemotingTopicRouteThreadPoolNums(int remotingTopicRouteThreadPoolNums) { + this.remotingTopicRouteThreadPoolNums = remotingTopicRouteThreadPoolNums; + } + + public int getRemotingSendMessageThreadPoolNums() { + return remotingSendMessageThreadPoolNums; + } + + public void setRemotingSendMessageThreadPoolNums(int remotingSendMessageThreadPoolNums) { + this.remotingSendMessageThreadPoolNums = remotingSendMessageThreadPoolNums; + } + + public int getRemotingPullMessageThreadPoolNums() { + return remotingPullMessageThreadPoolNums; + } + + public void setRemotingPullMessageThreadPoolNums(int remotingPullMessageThreadPoolNums) { + this.remotingPullMessageThreadPoolNums = remotingPullMessageThreadPoolNums; + } + + public int getRemotingUpdateOffsetThreadPoolNums() { + return remotingUpdateOffsetThreadPoolNums; + } + + public void setRemotingUpdateOffsetThreadPoolNums(int remotingUpdateOffsetThreadPoolNums) { + this.remotingUpdateOffsetThreadPoolNums = remotingUpdateOffsetThreadPoolNums; + } + + public int getRemotingDefaultThreadPoolNums() { + return remotingDefaultThreadPoolNums; + } + + public void setRemotingDefaultThreadPoolNums(int remotingDefaultThreadPoolNums) { + this.remotingDefaultThreadPoolNums = remotingDefaultThreadPoolNums; + } + + public int getRemotingHeartbeatThreadPoolQueueCapacity() { + return remotingHeartbeatThreadPoolQueueCapacity; + } + + public void setRemotingHeartbeatThreadPoolQueueCapacity(int remotingHeartbeatThreadPoolQueueCapacity) { + this.remotingHeartbeatThreadPoolQueueCapacity = remotingHeartbeatThreadPoolQueueCapacity; + } + + public int getRemotingTopicRouteThreadPoolQueueCapacity() { + return remotingTopicRouteThreadPoolQueueCapacity; + } + + public void setRemotingTopicRouteThreadPoolQueueCapacity(int remotingTopicRouteThreadPoolQueueCapacity) { + this.remotingTopicRouteThreadPoolQueueCapacity = remotingTopicRouteThreadPoolQueueCapacity; + } + + public int getRemotingSendThreadPoolQueueCapacity() { + return remotingSendThreadPoolQueueCapacity; + } + + public void setRemotingSendThreadPoolQueueCapacity(int remotingSendThreadPoolQueueCapacity) { + this.remotingSendThreadPoolQueueCapacity = remotingSendThreadPoolQueueCapacity; + } + + public int getRemotingPullThreadPoolQueueCapacity() { + return remotingPullThreadPoolQueueCapacity; + } + + public void setRemotingPullThreadPoolQueueCapacity(int remotingPullThreadPoolQueueCapacity) { + this.remotingPullThreadPoolQueueCapacity = remotingPullThreadPoolQueueCapacity; + } + + public int getRemotingUpdateOffsetThreadPoolQueueCapacity() { + return remotingUpdateOffsetThreadPoolQueueCapacity; + } + + public void setRemotingUpdateOffsetThreadPoolQueueCapacity(int remotingUpdateOffsetThreadPoolQueueCapacity) { + this.remotingUpdateOffsetThreadPoolQueueCapacity = remotingUpdateOffsetThreadPoolQueueCapacity; + } + + public int getRemotingDefaultThreadPoolQueueCapacity() { + return remotingDefaultThreadPoolQueueCapacity; + } + + public void setRemotingDefaultThreadPoolQueueCapacity(int remotingDefaultThreadPoolQueueCapacity) { + this.remotingDefaultThreadPoolQueueCapacity = remotingDefaultThreadPoolQueueCapacity; + } + + public long getRemotingWaitTimeMillsInSendQueue() { + return remotingWaitTimeMillsInSendQueue; + } + + public void setRemotingWaitTimeMillsInSendQueue(long remotingWaitTimeMillsInSendQueue) { + this.remotingWaitTimeMillsInSendQueue = remotingWaitTimeMillsInSendQueue; + } + + public long getRemotingWaitTimeMillsInPullQueue() { + return remotingWaitTimeMillsInPullQueue; + } + + public void setRemotingWaitTimeMillsInPullQueue(long remotingWaitTimeMillsInPullQueue) { + this.remotingWaitTimeMillsInPullQueue = remotingWaitTimeMillsInPullQueue; + } + + public long getRemotingWaitTimeMillsInHeartbeatQueue() { + return remotingWaitTimeMillsInHeartbeatQueue; + } + + public void setRemotingWaitTimeMillsInHeartbeatQueue(long remotingWaitTimeMillsInHeartbeatQueue) { + this.remotingWaitTimeMillsInHeartbeatQueue = remotingWaitTimeMillsInHeartbeatQueue; + } + + public long getRemotingWaitTimeMillsInUpdateOffsetQueue() { + return remotingWaitTimeMillsInUpdateOffsetQueue; + } + + public void setRemotingWaitTimeMillsInUpdateOffsetQueue(long remotingWaitTimeMillsInUpdateOffsetQueue) { + this.remotingWaitTimeMillsInUpdateOffsetQueue = remotingWaitTimeMillsInUpdateOffsetQueue; + } + + public long getRemotingWaitTimeMillsInTopicRouteQueue() { + return remotingWaitTimeMillsInTopicRouteQueue; + } + + public void setRemotingWaitTimeMillsInTopicRouteQueue(long remotingWaitTimeMillsInTopicRouteQueue) { + this.remotingWaitTimeMillsInTopicRouteQueue = remotingWaitTimeMillsInTopicRouteQueue; + } + + public long getRemotingWaitTimeMillsInDefaultQueue() { + return remotingWaitTimeMillsInDefaultQueue; + } + + public void setRemotingWaitTimeMillsInDefaultQueue(long remotingWaitTimeMillsInDefaultQueue) { + this.remotingWaitTimeMillsInDefaultQueue = remotingWaitTimeMillsInDefaultQueue; + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java new file mode 100644 index 000000000..73aeeaf42 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting; + +import com.google.common.base.Preconditions; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.concurrent.DefaultEventExecutorGroup; +import java.io.IOException; +import java.lang.reflect.Field; +import java.security.cert.CertificateException; +import java.util.concurrent.ConcurrentMap; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.proxy.common.ProxyException; +import org.apache.rocketmq.proxy.common.ProxyExceptionCode; +import org.apache.rocketmq.proxy.remoting.protocol.ProtocolNegotiationHandler; +import org.apache.rocketmq.proxy.remoting.protocol.http2proxy.Http2ProtocolProxyHandler; +import org.apache.rocketmq.proxy.remoting.protocol.remoting.RemotingProtocolHandler; +import org.apache.rocketmq.remoting.ChannelEventListener; +import org.apache.rocketmq.remoting.common.TlsMode; +import org.apache.rocketmq.remoting.netty.NettyEncoder; +import org.apache.rocketmq.remoting.netty.NettyRemotingServer; +import org.apache.rocketmq.remoting.netty.NettyServerConfig; +import org.apache.rocketmq.remoting.netty.TlsSystemConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * for remoting server, if config listen port is 8080 in nettyServerConfig + * <p> + * will + * <li>listen port at 9080 with protocol remoting</li> + * <li>listen port at 8080 with protocol remoting and http2</li> + */ +public class MultiProtocolRemotingServer extends NettyRemotingServer { + + private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); + private static final int PORT_DELTA = 1000; + private final NettyServerConfig nettyServerConfig; + private final int port; + + public MultiProtocolRemotingServer(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener) { + super(nettyServerConfig, channelEventListener); + this.port = nettyServerConfig.getListenPort(); + // to support multiple protocol + // will bind the real port in configChildHandler + // so let parent bind to a useless port + nettyServerConfig.setListenPort(nettyServerConfig.getListenPort() + PORT_DELTA); + this.nettyServerConfig = nettyServerConfig; + } + + @Override + public void loadSslContext() { + TlsMode tlsMode = TlsSystemConfig.tlsMode; + log.info("Server is running in TLS {} mode", tlsMode.getName()); + + if (tlsMode != TlsMode.DISABLED) { + try { + sslContext = MultiProtocolTlsHelper.buildSslContext(); + log.info("SSLContext created for server"); + } catch (CertificateException | IOException e) { + throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "Failed to create SSLContext for server", e); + } + } + } + + @Override + public void start() { + super.start(); + this.configChildHandler(); + } + + protected void configChildHandler() { + try { + ServerBootstrap serverBootstrap = getField("serverBootstrap", ServerBootstrap.class); + Preconditions.checkNotNull(serverBootstrap); + DefaultEventExecutorGroup defaultEventExecutorGroup = getField("defaultEventExecutorGroup", DefaultEventExecutorGroup.class); + Preconditions.checkNotNull(defaultEventExecutorGroup); + NettyEncoder encoder = getField("encoder", NettyEncoder.class); + Preconditions.checkNotNull(encoder); + ChannelDuplexHandler connectionManageHandler = getField("connectionManageHandler", ChannelDuplexHandler.class); + Preconditions.checkNotNull(connectionManageHandler); + SimpleChannelInboundHandler serverHandler = getField("serverHandler", SimpleChannelInboundHandler.class); + Preconditions.checkNotNull(serverHandler); + SimpleChannelInboundHandler handshakeHandler = getField("handshakeHandler", SimpleChannelInboundHandler.class); + Preconditions.checkNotNull(handshakeHandler); + ConcurrentMap remotingServerTable = getField("remotingServerTable", ConcurrentMap.class); + Preconditions.checkNotNull(remotingServerTable); + + serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { + @Override + protected void initChannel(SocketChannel ch) { + ch.pipeline() + .addLast(defaultEventExecutorGroup, "handshakeHandler", handshakeHandler) + .addLast(defaultEventExecutorGroup, + new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), + new ProtocolNegotiationHandler(new RemotingProtocolHandler(encoder, connectionManageHandler, serverHandler)) + .addProtocolHandler(new Http2ProtocolProxyHandler()) + ); + } + }); + remotingServerTable.put(port, this); + serverBootstrap.bind(port).sync(); + } catch (Throwable t) { + throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "config netty child handler failed", t); + } + } + + protected <T> T getField(String name, Class<T> getClazz) throws Throwable { + Field field = NettyRemotingServer.class.getDeclaredField(name); + field.setAccessible(true); + return getClazz.cast(field.get(this)); + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolTlsHelper.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolTlsHelper.java new file mode 100644 index 000000000..54af7bc9e --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolTlsHelper.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting; + +import io.netty.handler.ssl.ApplicationProtocolConfig; +import io.netty.handler.ssl.ApplicationProtocolNames; +import io.netty.handler.ssl.ClientAuth; +import io.netty.handler.ssl.OpenSsl; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.SslProvider; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.netty.handler.ssl.util.SelfSignedCertificate; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.security.cert.CertificateException; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.remoting.netty.TlsHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsServerAuthClient; +import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsServerCertPath; +import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsServerKeyPassword; +import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsServerKeyPath; +import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsServerNeedClientAuth; +import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsServerTrustCertPath; +import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsTestModeEnable; + +public class MultiProtocolTlsHelper extends TlsHelper { + private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); + private static final DecryptionStrategy DECRYPTION_STRATEGY = (privateKeyEncryptPath, forClient) -> new FileInputStream(privateKeyEncryptPath); + + public static SslContext buildSslContext() throws IOException, CertificateException { + TlsHelper.buildSslContext(false); + SslProvider provider; + if (OpenSsl.isAvailable()) { + provider = SslProvider.OPENSSL; + log.info("Using OpenSSL provider"); + } else { + provider = SslProvider.JDK; + log.info("Using JDK SSL provider"); + } + + SslContextBuilder sslContextBuilder = null; + if (tlsTestModeEnable) { + SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate(); + sslContextBuilder = SslContextBuilder + .forServer(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey()) + .sslProvider(SslProvider.OPENSSL) + .clientAuth(ClientAuth.OPTIONAL); + } else { + sslContextBuilder = SslContextBuilder.forServer( + !StringUtils.isBlank(tlsServerCertPath) ? Files.newInputStream(Paths.get(tlsServerCertPath)) : null, + !StringUtils.isBlank(tlsServerKeyPath) ? DECRYPTION_STRATEGY.decryptPrivateKey(tlsServerKeyPath, false) : null, + !StringUtils.isBlank(tlsServerKeyPassword) ? tlsServerKeyPassword : null) + .sslProvider(provider); + + if (!tlsServerAuthClient) { + sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE); + } else { + if (!StringUtils.isBlank(tlsServerTrustCertPath)) { + sslContextBuilder.trustManager(new File(tlsServerTrustCertPath)); + } + } + + sslContextBuilder.clientAuth(parseClientAuthMode(tlsServerNeedClientAuth)); + } + + sslContextBuilder.applicationProtocolConfig(new ApplicationProtocolConfig( + ApplicationProtocolConfig.Protocol.ALPN, + // NO_ADVERTISE is currently the only mode supported by both OpenSsl and JDK providers. + ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE, + // ACCEPT is currently the only mode supported by both OpenSsl and JDK providers. + ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT, + ApplicationProtocolNames.HTTP_2)); + + return sslContextBuilder.build(); + } + + private static ClientAuth parseClientAuthMode(String authMode) { + if (null == authMode || authMode.trim().isEmpty()) { + return ClientAuth.NONE; + } + + for (ClientAuth clientAuth : ClientAuth.values()) { + if (clientAuth.name().equals(authMode.toUpperCase())) { + return clientAuth; + } + } + + return ClientAuth.NONE; + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java index 58b257641..fdf1870a5 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java @@ -18,20 +18,161 @@ package org.apache.rocketmq.proxy.remoting; import io.netty.channel.Channel; +import java.lang.reflect.Field; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.FutureTask; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.broker.latency.FutureTaskExt; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.thread.ThreadPoolMonitor; +import org.apache.rocketmq.common.thread.ThreadPoolStatusMonitor; +import org.apache.rocketmq.proxy.common.ReflectionCache; import org.apache.rocketmq.proxy.common.StartAndShutdown; +import org.apache.rocketmq.proxy.config.ConfigurationManager; +import org.apache.rocketmq.proxy.config.ProxyConfig; import org.apache.rocketmq.proxy.processor.MessagingProcessor; +import org.apache.rocketmq.proxy.remoting.activity.AckMessageActivity; +import org.apache.rocketmq.proxy.remoting.activity.ChangeInvisibleTimeActivity; +import org.apache.rocketmq.proxy.remoting.activity.ClientManagerActivity; +import org.apache.rocketmq.proxy.remoting.activity.ConsumerManagerActivity; +import org.apache.rocketmq.proxy.remoting.activity.GetTopicRouteActivity; +import org.apache.rocketmq.proxy.remoting.activity.PopMessageActivity; +import org.apache.rocketmq.proxy.remoting.activity.PullMessageActivity; +import org.apache.rocketmq.proxy.remoting.activity.SendMessageActivity; +import org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager; +import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; +import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.RemotingServer; +import org.apache.rocketmq.remoting.netty.NettyRemotingServer; +import org.apache.rocketmq.remoting.netty.NettyServerConfig; +import org.apache.rocketmq.remoting.netty.RequestTask; +import org.apache.rocketmq.remoting.netty.TlsSystemConfig; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOutClient { + private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); - private final MessagingProcessor messagingProcessor; - private RemotingServer defaultRemotingServer; + protected final MessagingProcessor messagingProcessor; + protected final RemotingChannelManager remotingChannelManager; + protected final ChannelEventListener clientHousekeepingService; + protected final RemotingServer defaultRemotingServer; + protected final GetTopicRouteActivity getTopicRouteActivity; + protected final ClientManagerActivity clientManagerActivity; + protected final ConsumerManagerActivity consumerManagerActivity; + protected final SendMessageActivity sendMessageActivity; + protected final PullMessageActivity pullMessageActivity; + protected final PopMessageActivity popMessageActivity; + protected final AckMessageActivity ackMessageActivity; + protected final ChangeInvisibleTimeActivity changeInvisibleTimeActivity; + protected final ThreadPoolExecutor sendMessageExecutor; + protected final ThreadPoolExecutor pullMessageExecutor; + protected final ThreadPoolExecutor heartbeatExecutor; + protected final ThreadPoolExecutor updateOffsetExecutor; + protected final ThreadPoolExecutor topicRouteExecutor; + protected final ThreadPoolExecutor defaultExecutor; + + private final ReflectionCache reflectionCache = new ReflectionCache(); public RemotingProtocolServer(MessagingProcessor messagingProcessor) { this.messagingProcessor = messagingProcessor; + this.remotingChannelManager = new RemotingChannelManager(this, messagingProcessor.getProxyRelayService()); + + RequestPipeline pipeline = createRequestPipeline(); + this.getTopicRouteActivity = new GetTopicRouteActivity(pipeline, messagingProcessor); + this.clientManagerActivity = new ClientManagerActivity(pipeline, messagingProcessor, remotingChannelManager); + this.consumerManagerActivity = new ConsumerManagerActivity(pipeline, messagingProcessor); + this.sendMessageActivity = new SendMessageActivity(pipeline, messagingProcessor); + this.pullMessageActivity = new PullMessageActivity(pipeline, messagingProcessor); + this.popMessageActivity = new PopMessageActivity(pipeline, messagingProcessor); + this.ackMessageActivity = new AckMessageActivity(pipeline, messagingProcessor); + this.changeInvisibleTimeActivity = new ChangeInvisibleTimeActivity(pipeline, messagingProcessor); + + ProxyConfig config = ConfigurationManager.getProxyConfig(); + NettyServerConfig defaultServerConfig = new NettyServerConfig(); + defaultServerConfig.setListenPort(config.getRemotingListenPort()); + TlsSystemConfig.tlsTestModeEnable = false; + System.setProperty(TlsSystemConfig.TLS_TEST_MODE_ENABLE, "false"); + TlsSystemConfig.tlsServerCertPath = config.getGrpcTlsCertPath(); + System.setProperty(TlsSystemConfig.TLS_SERVER_CERTPATH, config.getGrpcTlsCertPath()); + TlsSystemConfig.tlsServerKeyPath = config.getGrpcTlsKeyPath(); + System.setProperty(TlsSystemConfig.TLS_SERVER_KEYPATH, config.getGrpcTlsKeyPath()); + + this.clientHousekeepingService = new ClientHousekeepingService(this.clientManagerActivity); + + if (config.isEnableRemotingLocalProxyGrpc()) { + this.defaultRemotingServer = new MultiProtocolRemotingServer(defaultServerConfig, this.clientHousekeepingService); + } else { + this.defaultRemotingServer = new NettyRemotingServer(defaultServerConfig, this.clientHousekeepingService); + } + this.registerRemotingServer(this.defaultRemotingServer); + + this.sendMessageExecutor = ThreadPoolMonitor.createAndMonitor( + config.getRemotingSendMessageThreadPoolNums(), + config.getRemotingSendMessageThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + "RemotingSendMessageThread", + config.getRemotingSendThreadPoolQueueCapacity(), + new ThreadPoolHeadSlowTimeMillsMonitor(config.getRemotingWaitTimeMillsInSendQueue()) + ); + + this.pullMessageExecutor = ThreadPoolMonitor.createAndMonitor( + config.getRemotingPullMessageThreadPoolNums(), + config.getRemotingPullMessageThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + "RemotingPullMessageThread", + config.getRemotingPullThreadPoolQueueCapacity(), + new ThreadPoolHeadSlowTimeMillsMonitor(config.getRemotingWaitTimeMillsInPullQueue()) + ); + + this.updateOffsetExecutor = ThreadPoolMonitor.createAndMonitor( + config.getRemotingUpdateOffsetThreadPoolNums(), + config.getRemotingUpdateOffsetThreadPoolNums(), + 1, + TimeUnit.MINUTES, + "RemotingUpdateOffsetThread", + config.getRemotingUpdateOffsetThreadPoolQueueCapacity(), + new ThreadPoolHeadSlowTimeMillsMonitor(config.getRemotingWaitTimeMillsInUpdateOffsetQueue()) + ); + + this.heartbeatExecutor = ThreadPoolMonitor.createAndMonitor( + config.getRemotingHeartbeatThreadPoolNums(), + config.getRemotingHeartbeatThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + "RemotingHeartbeatThread", + config.getRemotingHeartbeatThreadPoolQueueCapacity(), + new ThreadPoolHeadSlowTimeMillsMonitor(config.getRemotingWaitTimeMillsInHeartbeatQueue()) + ); + + this.topicRouteExecutor = ThreadPoolMonitor.createAndMonitor( + config.getRemotingTopicRouteThreadPoolNums(), + config.getRemotingTopicRouteThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + "RemotingTopicRouteThread", + config.getRemotingTopicRouteThreadPoolQueueCapacity(), + new ThreadPoolHeadSlowTimeMillsMonitor(config.getRemotingWaitTimeMillsInTopicRouteQueue()) + ); + + this.defaultExecutor = ThreadPoolMonitor.createAndMonitor( + config.getRemotingDefaultThreadPoolNums(), + config.getRemotingDefaultThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + "RemotingDefaultThread", + config.getRemotingDefaultThreadPoolQueueCapacity(), + new ThreadPoolHeadSlowTimeMillsMonitor(config.getRemotingWaitTimeMillsInDefaultQueue()) + ); } protected void init() { @@ -39,17 +180,50 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu } protected void registerRemotingServer(RemotingServer remotingServer) { + remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendMessageActivity, this.sendMessageExecutor); + remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageActivity, this.sendMessageExecutor); + remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendMessageActivity, this.sendMessageExecutor); + remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageActivity, sendMessageExecutor); + + remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManagerActivity, this.heartbeatExecutor); + remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManagerActivity, this.defaultExecutor); + remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManagerActivity, this.defaultExecutor); + + remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, pullMessageActivity, this.pullMessageExecutor); + remotingServer.registerProcessor(RequestCode.LITE_PULL_MESSAGE, pullMessageActivity, this.pullMessageExecutor); + remotingServer.registerProcessor(RequestCode.POP_MESSAGE, pullMessageActivity, this.pullMessageExecutor); + remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManagerActivity, this.updateOffsetExecutor); + remotingServer.registerProcessor(RequestCode.ACK_MESSAGE, consumerManagerActivity, this.updateOffsetExecutor); + remotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, consumerManagerActivity, this.updateOffsetExecutor); + + remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManagerActivity, this.defaultExecutor); + remotingServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManagerActivity, this.defaultExecutor); + remotingServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManagerActivity, this.defaultExecutor); + remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManagerActivity, this.defaultExecutor); + remotingServer.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManagerActivity, this.defaultExecutor); + remotingServer.registerProcessor(RequestCode.LOCK_BATCH_MQ, consumerManagerActivity, this.defaultExecutor); + remotingServer.registerProcessor(RequestCode.UNLOCK_BATCH_MQ, consumerManagerActivity, this.defaultExecutor); + + remotingServer.registerProcessor(RequestCode.GET_ROUTEINFO_BY_TOPIC, getTopicRouteActivity, this.topicRouteExecutor); } @Override public void shutdown() throws Exception { - + this.defaultRemotingServer.shutdown(); + this.remotingChannelManager.shutdown(); + this.sendMessageExecutor.shutdown(); + this.pullMessageExecutor.shutdown(); + this.heartbeatExecutor.shutdown(); + this.updateOffsetExecutor.shutdown(); + this.topicRouteExecutor.shutdown(); + this.defaultExecutor.shutdown(); } @Override public void start() throws Exception { - + this.remotingChannelManager.start(); + this.defaultRemotingServer.start(); } @Override @@ -69,4 +243,126 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu } return future; } + + protected RequestPipeline createRequestPipeline() { + RequestPipeline pipeline = (ctx, request, context) -> { + }; + + // add pipeline + // the last pipe add will execute at the first + return pipeline; + } + + protected class ThreadPoolHeadSlowTimeMillsMonitor implements ThreadPoolStatusMonitor { + + private final long maxWaitTimeMillsInQueue; + + public ThreadPoolHeadSlowTimeMillsMonitor(long maxWaitTimeMillsInQueue) { + this.maxWaitTimeMillsInQueue = maxWaitTimeMillsInQueue; + } + + @Override + public String describe() { + return "headSlow"; + } + + @Override + public double value(ThreadPoolExecutor executor) { + return headSlowTimeMills(executor.getQueue()); + } + + @Override + public boolean needPrintJstack(ThreadPoolExecutor executor, double value) { + return value > maxWaitTimeMillsInQueue; + } + } + + protected long headSlowTimeMills(BlockingQueue<Runnable> q) { + try { + long slowTimeMills = 0; + final Runnable peek = q.peek(); + if (peek != null) { + RequestTask rt = castRunnable(peek); + slowTimeMills = rt == null ? 0 : System.currentTimeMillis() - rt.getCreateTimestamp(); + } + + if (slowTimeMills < 0) { + slowTimeMills = 0; + } + + return slowTimeMills; + } catch (Exception e) { + log.error("error when headSlowTimeMills.", e); + } + return -1; + } + + protected void cleanExpireRequest() { + ProxyConfig config = ConfigurationManager.getProxyConfig(); + + cleanExpiredRequestInQueue(this.sendMessageExecutor, config.getRemotingWaitTimeMillsInSendQueue()); + cleanExpiredRequestInQueue(this.pullMessageExecutor, config.getRemotingWaitTimeMillsInPullQueue()); + cleanExpiredRequestInQueue(this.heartbeatExecutor, config.getRemotingWaitTimeMillsInHeartbeatQueue()); + cleanExpiredRequestInQueue(this.updateOffsetExecutor, config.getRemotingWaitTimeMillsInUpdateOffsetQueue()); + cleanExpiredRequestInQueue(this.topicRouteExecutor, config.getRemotingWaitTimeMillsInTopicRouteQueue()); + cleanExpiredRequestInQueue(this.defaultExecutor, config.getRemotingWaitTimeMillsInDefaultQueue()); + } + + protected void cleanExpiredRequestInQueue(ThreadPoolExecutor threadPoolExecutor, long maxWaitTimeMillsInQueue) { + while (true) { + try { + BlockingQueue<Runnable> blockingQueue = threadPoolExecutor.getQueue(); + if (!blockingQueue.isEmpty()) { + final Runnable runnable = blockingQueue.peek(); + if (null == runnable) { + break; + } + final RequestTask rt = castRunnable(runnable); + if (rt == null || rt.isStopRun()) { + break; + } + + final long behind = System.currentTimeMillis() - rt.getCreateTimestamp(); + if (behind >= maxWaitTimeMillsInQueue) { + if (blockingQueue.remove(runnable)) { + rt.setStopRun(true); + rt.returnResponse(ResponseCode.SYSTEM_BUSY, + String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size())); + } + } else { + break; + } + } else { + break; + } + } catch (Throwable ignored) { + } + } + } + + private RequestTask castRunnable(final Runnable runnable) { + try { + if (runnable instanceof FutureTask) { + Field callableField = reflectionCache.getDeclaredField(FutureTask.class, "callable"); + Callable callable = (Callable) callableField.get(runnable); + if (callable == null) { + return null; + } + Field taskField = reflectionCache.getDeclaredField(callable.getClass(), "task"); + if (taskField == null) { + log.warn("get task from FutureTask failed. class:{}", runnable.getClass().getName()); + return null; + } + return (RequestTask) taskField.get(callable); + } else if (runnable instanceof FutureTaskExt) { + FutureTaskExt futureTaskExt = (FutureTaskExt) runnable; + return (RequestTask) futureTaskExt.getRunnable(); + } + return null; + } catch (Throwable e) { + log.error("castRunnable exception. class:{}", runnable.getClass().getName(), e); + } + + return null; + } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/ProtocolHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/ProtocolHandler.java new file mode 100644 index 000000000..4b1b03067 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/ProtocolHandler.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting.protocol; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; + +public interface ProtocolHandler { + + boolean match(ByteBuf msg); + + void config(final ChannelHandlerContext ctx, final ByteBuf msg); +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/ProtocolNegotiationHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/ProtocolNegotiationHandler.java new file mode 100644 index 000000000..da2dded5f --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/ProtocolNegotiationHandler.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.proxy.remoting.protocol; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import java.util.ArrayList; +import java.util.List; + +public class ProtocolNegotiationHandler extends ByteToMessageDecoder { + + private final List<ProtocolHandler> protocolHandlerList = new ArrayList<ProtocolHandler>(); + private final ProtocolHandler fallbackProtocolHandler; + + public ProtocolNegotiationHandler(ProtocolHandler fallbackProtocolHandler) { + this.fallbackProtocolHandler = fallbackProtocolHandler; + } + + public ProtocolNegotiationHandler addProtocolHandler(ProtocolHandler protocolHandler) { + protocolHandlerList.add(protocolHandler); + return this; + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { + // use 4 bytes to judge protocol + if (in.readableBytes() < 4) { + return; + } + + ProtocolHandler protocolHandler = null; + for (ProtocolHandler curProtocolHandler : protocolHandlerList) { + if (curProtocolHandler.match(in)) { + protocolHandler = curProtocolHandler; + break; + } + } + + if (protocolHandler == null) { + protocolHandler = fallbackProtocolHandler; + } + + protocolHandler.config(ctx, in); + ctx.pipeline().remove(this); + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java new file mode 100644 index 000000000..c5050cda7 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting.protocol.http2proxy; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.handler.ssl.ApplicationProtocolConfig; +import io.netty.handler.ssl.ApplicationProtocolNames; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.SslProvider; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import javax.net.ssl.SSLException; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.proxy.config.ConfigurationManager; +import org.apache.rocketmq.proxy.config.ProxyConfig; +import org.apache.rocketmq.proxy.remoting.protocol.ProtocolHandler; +import org.apache.rocketmq.remoting.common.RemotingHelper; + +public class Http2ProtocolProxyHandler implements ProtocolHandler { + private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); + private static final String LOCAL_HOST = "127.0.0.1"; + /** + * The int value of "PRI ". Now use 4 bytes to judge protocol, may be has potential risks if there is a new protocol + * which start with "PRI " too in the future + * <p> + * The full HTTP/2 connection preface is "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" + * <p> + * ref: https://datatracker.ietf.org/doc/html/rfc7540#section-3.5 + */ + private static final int PRI_INT = 0x50524920; + + private final SslContext sslContext; + + public Http2ProtocolProxyHandler() { + try { + sslContext = SslContextBuilder + .forClient() + .sslProvider(SslProvider.OPENSSL) + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .applicationProtocolConfig(new ApplicationProtocolConfig( + ApplicationProtocolConfig.Protocol.ALPN, + ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE, + ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT, + ApplicationProtocolNames.HTTP_2)) + .build(); + } catch (SSLException e) { + log.error("Failed to create SSLContext for Http2ProtocolProxyHandler", e); + throw new RuntimeException("Failed to create SSLContext for Http2ProtocolProxyHandler", e); + } + } + + @Override + public boolean match(ByteBuf in) { + if (!ConfigurationManager.getProxyConfig().isEnableRemotingLocalProxyGrpc()) { + return false; + } + + // If starts with 'PRI ' + return in.getInt(in.readerIndex()) == PRI_INT; + } + + @Override + public void config(final ChannelHandlerContext ctx, final ByteBuf msg) { + // proxy channel to http2 server + final Channel inboundChannel = ctx.channel(); + + ProxyConfig config = ConfigurationManager.getProxyConfig(); + // Start the connection attempt. + Bootstrap b = new Bootstrap(); + b.group(inboundChannel.eventLoop()) + .channel(ctx.channel().getClass()) + .handler(new ChannelInitializer<Channel>() { + @Override + protected void initChannel(Channel ch) throws Exception { + if (sslContext != null) { + ch.pipeline() + .addLast(sslContext.newHandler(ch.alloc(), LOCAL_HOST, config.getGrpcServerPort())); + } + ch.pipeline().addLast(new Http2ProxyBackendHandler(inboundChannel)); + } + }) + .option(ChannelOption.AUTO_READ, false) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getLocalProxyConnectTimeoutMs()); + ChannelFuture f; + try { + f = b.connect(LOCAL_HOST, config.getGrpcServerPort()).sync(); + } catch (Exception e) { + log.error("connect http2 server failed. port:{}", config.getGrpcServerPort(), e); + inboundChannel.close(); + return; + } + + final Channel outboundChannel = f.channel(); + + ctx.pipeline().addLast(new Http2ProxyFrontendHandler(outboundChannel)); + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java new file mode 100644 index 000000000..53bddfc31 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting.protocol.http2proxy; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.remoting.common.RemotingHelper; + +public class Http2ProxyBackendHandler extends ChannelInboundHandlerAdapter { + private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); + + private final Channel inboundChannel; + + public Http2ProxyBackendHandler(Channel inboundChannel) { + this.inboundChannel = inboundChannel; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + ctx.read(); + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, Object msg) { + inboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) { + if (future.isSuccess()) { + ctx.channel().read(); + } else { + future.channel().close(); + } + } + }); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + Http2ProxyFrontendHandler.closeOnFlush(inboundChannel); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + log.error("Http2ProxyBackendHandler#exceptionCaught", cause); + Http2ProxyFrontendHandler.closeOnFlush(ctx.channel()); + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java new file mode 100644 index 000000000..8bffdc6d0 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting.protocol.http2proxy; + +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.remoting.common.RemotingHelper; + +public class Http2ProxyFrontendHandler extends ChannelInboundHandlerAdapter { + private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); + // As we use inboundChannel.eventLoop() when building the Bootstrap this does not need to be volatile as + // the outboundChannel will use the same EventLoop (and therefore Thread) as the inboundChannel. + private final Channel outboundChannel; + + public Http2ProxyFrontendHandler(final Channel outboundChannel) { + this.outboundChannel = outboundChannel; + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, Object msg) { + if (outboundChannel.isActive()) { + outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) { + if (future.isSuccess()) { + // was able to flush out data, start to read the next chunk + ctx.channel().read(); + } else { + future.channel().close(); + } + } + }); + } + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + if (outboundChannel != null) { + closeOnFlush(outboundChannel); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + log.error("Http2ProxyFrontendHandler#exceptionCaught", cause); + closeOnFlush(ctx.channel()); + } + + /** + * Closes the specified channel after all queued write requests are flushed. + */ + static void closeOnFlush(Channel ch) { + if (ch.isActive()) { + ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); + } + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/remoting/RemotingProtocolHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/remoting/RemotingProtocolHandler.java new file mode 100644 index 000000000..3e4cc7c04 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/remoting/RemotingProtocolHandler.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting.protocol.remoting; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.apache.rocketmq.proxy.remoting.protocol.ProtocolHandler; +import org.apache.rocketmq.remoting.netty.NettyDecoder; +import org.apache.rocketmq.remoting.netty.NettyEncoder; + +public class RemotingProtocolHandler implements ProtocolHandler { + + private final NettyEncoder encoder; + private final ChannelDuplexHandler connectionManageHandler; + private final SimpleChannelInboundHandler serverHandler; + + public RemotingProtocolHandler(NettyEncoder encoder, ChannelDuplexHandler connectionManageHandler, + SimpleChannelInboundHandler serverHandler) { + this.encoder = encoder; + this.connectionManageHandler = connectionManageHandler; + this.serverHandler = serverHandler; + } + + @Override + public boolean match(ByteBuf in) { + return true; + } + + @Override + public void config(ChannelHandlerContext ctx, ByteBuf msg) { + ctx.pipeline().addLast( + this.encoder, + new NettyDecoder(), + this.connectionManageHandler, + this.serverHandler + ); + } +}
