This is an automated email from the ASF dual-hosted git repository. yukon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-remoting.git
commit 449dfa8649b802b753c4033f437ca4a89aef3569 Author: yukon <[email protected]> AuthorDate: Wed May 15 16:19:07 2019 +0800 Remove multi-protocol support feature, only support RemotingCommand protocol --- .../rocketmq/remoting/api/RemotingMarshaller.java | 3 - .../rocketmq/remoting/api/protocol/Protocol.java | 39 ------ .../remoting/api/protocol/ProtocolFactory.java | 30 ----- .../common/RemotingCommandFactoryMeta.java | 49 -------- .../rocketmq/remoting/config/RemotingConfig.java | 14 +-- .../impl/command/RemotingCommandFactoryImpl.java | 18 +-- .../remoting/impl/netty/NettyRemotingAbstract.java | 15 +-- .../remoting/impl/netty/NettyRemotingClient.java | 36 +----- .../remoting/impl/netty/NettyRemotingServer.java | 35 +----- .../remoting/impl/netty/handler/Http2Handler.java | 139 --------------------- .../impl/netty/handler/ProtocolSelector.java | 65 ---------- .../remoting/impl/protocol/Httpv2Protocol.java | 52 -------- .../impl/protocol/ProtocolFactoryImpl.java | 83 ------------ .../impl/protocol/RemotingCoreProtocol.java | 46 ------- .../remoting/impl/protocol/WebSocketProtocol.java | 38 ------ 15 files changed, 21 insertions(+), 641 deletions(-) diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingMarshaller.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingMarshaller.java index 0386a03..62c9dda 100644 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingMarshaller.java +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingMarshaller.java @@ -17,11 +17,8 @@ package org.apache.rocketmq.remoting.api; -import org.apache.rocketmq.remoting.api.protocol.ProtocolFactory; import org.apache.rocketmq.remoting.api.serializable.SerializerFactory; public interface RemotingMarshaller { - ProtocolFactory protocolFactory(); - SerializerFactory serializerFactory(); } diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/Protocol.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/Protocol.java deleted file mode 100644 index 5caf167..0000000 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/Protocol.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting.api.protocol; - -import org.apache.rocketmq.remoting.api.channel.ChannelHandlerContextWrapper; - -public interface Protocol { - /** - * Minimum Viable Protocol - */ - String MVP = "mvp"; - String HTTP2 = "http2"; - String WEBSOCKET = "websocket"; - - byte MVP_MAGIC = 0x14; - byte WEBSOCKET_MAGIC = 0x15; - byte HTTP_2_MAGIC = 0x16; - - String name(); - - byte type(); - - void assembleHandler(ChannelHandlerContextWrapper ctx); -} diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/ProtocolFactory.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/ProtocolFactory.java deleted file mode 100644 index cf016f9..0000000 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/ProtocolFactory.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting.api.protocol; - -public interface ProtocolFactory { - void register(Protocol protocol); - - void resetAll(Protocol protocol); - - byte type(String protocolName); - - Protocol get(byte type); - - void clearAll(); -} diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/RemotingCommandFactoryMeta.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/RemotingCommandFactoryMeta.java deleted file mode 100644 index d5c0aaa..0000000 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/RemotingCommandFactoryMeta.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting.common; - -import org.apache.rocketmq.remoting.api.protocol.ProtocolFactory; -import org.apache.rocketmq.remoting.api.serializable.SerializerFactory; -import org.apache.rocketmq.remoting.impl.protocol.Httpv2Protocol; -import org.apache.rocketmq.remoting.impl.protocol.ProtocolFactoryImpl; -import org.apache.rocketmq.remoting.impl.protocol.serializer.MsgPackSerializer; -import org.apache.rocketmq.remoting.impl.protocol.serializer.SerializerFactoryImpl; - -public class RemotingCommandFactoryMeta { - private final ProtocolFactory protocolFactory = new ProtocolFactoryImpl(); - private final SerializerFactory serializerFactory = new SerializerFactoryImpl(); - private byte protocolType = Httpv2Protocol.MVP_MAGIC; - private byte serializeType = MsgPackSerializer.SERIALIZER_TYPE; - - public RemotingCommandFactoryMeta() { - } - - public RemotingCommandFactoryMeta(String protocolName, String serializeName) { - this.protocolType = protocolFactory.type(protocolName); - this.serializeType = serializerFactory.type(serializeName); - } - - public byte getSerializeType() { - return serializeType; - } - - public byte getProtocolType() { - return protocolType; - } - -} diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java index b330041..f7a4b67 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java @@ -19,9 +19,8 @@ package org.apache.rocketmq.remoting.config; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; -import org.apache.rocketmq.remoting.api.protocol.Protocol; import org.apache.rocketmq.remoting.impl.protocol.compression.GZipCompressor; -import org.apache.rocketmq.remoting.impl.protocol.serializer.MsgPackSerializer; +import org.apache.rocketmq.remoting.impl.protocol.serializer.JsonSerializer; public class RemotingConfig extends TcpSocketConfig { private int connectionMaxRetries = 3; @@ -38,8 +37,7 @@ public class RemotingConfig extends TcpSocketConfig { private int threadTaskLowWaterMark = 30000; private int threadTaskHighWaterMark = 50000; private int connectionRetryBackoffMillis = 3000; - private String protocolName = Protocol.MVP; - private String serializerName = MsgPackSerializer.SERIALIZER_NAME; + private String serializerName = JsonSerializer.SERIALIZER_NAME; private String compressorName = GZipCompressor.COMPRESSOR_NAME; private int serviceThreadBlockQueueSize = 50000; private boolean clientNativeEpollEnable = false; @@ -149,14 +147,6 @@ public class RemotingConfig extends TcpSocketConfig { this.connectionRetryBackoffMillis = connectionRetryBackoffMillis; } - public String getProtocolName() { - return protocolName; - } - - public void setProtocolName(final String protocolName) { - this.protocolName = protocolName; - } - public String getSerializerName() { return serializerName; } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java index f5d2126..6e1efaa 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java @@ -20,24 +20,28 @@ package org.apache.rocketmq.remoting.impl.command; import org.apache.rocketmq.remoting.api.command.RemotingCommand; import org.apache.rocketmq.remoting.api.command.RemotingCommandFactory; import org.apache.rocketmq.remoting.api.command.TrafficType; -import org.apache.rocketmq.remoting.common.RemotingCommandFactoryMeta; +import org.apache.rocketmq.remoting.api.serializable.SerializerFactory; +import org.apache.rocketmq.remoting.impl.protocol.serializer.JsonSerializer; +import org.apache.rocketmq.remoting.impl.protocol.serializer.SerializerFactoryImpl; public class RemotingCommandFactoryImpl implements RemotingCommandFactory { - private RemotingCommandFactoryMeta remotingCommandFactoryMeta; + private final SerializerFactory serializerFactory = new SerializerFactoryImpl(); + private byte serializeType = JsonSerializer.SERIALIZER_TYPE; + + private byte PROTOCOL_MAGIC = 0x14; public RemotingCommandFactoryImpl() { - this(new RemotingCommandFactoryMeta()); } - public RemotingCommandFactoryImpl(final RemotingCommandFactoryMeta remotingCommandFactoryMeta) { - this.remotingCommandFactoryMeta = remotingCommandFactoryMeta; + public RemotingCommandFactoryImpl(final String serializeName) { + this.serializeType = serializerFactory.type(serializeName); } @Override public RemotingCommand createRequest() { RemotingCommand request = new RemotingCommandImpl(); - request.protocolType(this.remotingCommandFactoryMeta.getProtocolType()); - request.serializerType(this.remotingCommandFactoryMeta.getSerializeType()); + request.protocolType(this.PROTOCOL_MAGIC); + request.serializerType(this.serializeType); return request; } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java index 82b17f4..a4c33e1 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java @@ -48,19 +48,16 @@ import org.apache.rocketmq.remoting.api.interceptor.Interceptor; import org.apache.rocketmq.remoting.api.interceptor.InterceptorGroup; import org.apache.rocketmq.remoting.api.interceptor.RequestContext; import org.apache.rocketmq.remoting.api.interceptor.ResponseContext; -import org.apache.rocketmq.remoting.api.protocol.ProtocolFactory; import org.apache.rocketmq.remoting.api.serializable.Serializer; import org.apache.rocketmq.remoting.api.serializable.SerializerFactory; import org.apache.rocketmq.remoting.common.ChannelEventListenerGroup; import org.apache.rocketmq.remoting.common.Pair; -import org.apache.rocketmq.remoting.common.RemotingCommandFactoryMeta; import org.apache.rocketmq.remoting.common.ResponseResult; import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce; import org.apache.rocketmq.remoting.config.RemotingConfig; import org.apache.rocketmq.remoting.external.ThreadUtils; import org.apache.rocketmq.remoting.impl.channel.NettyChannelImpl; import org.apache.rocketmq.remoting.impl.command.RemotingCommandFactoryImpl; -import org.apache.rocketmq.remoting.impl.protocol.ProtocolFactoryImpl; import org.apache.rocketmq.remoting.impl.protocol.serializer.SerializerFactoryImpl; import org.apache.rocketmq.remoting.internal.UIDGenerator; import org.jetbrains.annotations.NotNull; @@ -69,7 +66,6 @@ import org.slf4j.LoggerFactory; public abstract class NettyRemotingAbstract implements RemotingService { protected static final Logger LOG = LoggerFactory.getLogger(NettyRemotingAbstract.class); - protected final ProtocolFactory protocolFactory = new ProtocolFactoryImpl(); protected final SerializerFactory serializerFactory = new SerializerFactoryImpl(); protected final ChannelEventExecutor channelEventExecutor = new ChannelEventExecutor("ChannelEventExecutor"); private final Semaphore semaphoreOneway; @@ -86,16 +82,12 @@ public abstract class NettyRemotingAbstract implements RemotingService { private ChannelEventListenerGroup channelEventListenerGroup = new ChannelEventListenerGroup(); NettyRemotingAbstract(RemotingConfig clientConfig) { - this(clientConfig, new RemotingCommandFactoryMeta()); - } - - NettyRemotingAbstract(RemotingConfig clientConfig, RemotingCommandFactoryMeta remotingCommandFactoryMeta) { this.semaphoreOneway = new Semaphore(clientConfig.getClientOnewayInvokeSemaphore(), true); this.semaphoreAsync = new Semaphore(clientConfig.getClientAsyncInvokeSemaphore(), true); this.publicExecutor = ThreadUtils.newFixedThreadPool( clientConfig.getClientAsyncCallbackExecutorThreads(), 10000, "Remoting-PublicExecutor", true); - this.remotingCommandFactory = new RemotingCommandFactoryImpl(remotingCommandFactoryMeta); + this.remotingCommandFactory = new RemotingCommandFactoryImpl(clientConfig.getSerializerName()); } public SerializerFactory getSerializerFactory() { @@ -516,11 +508,6 @@ public abstract class NettyRemotingAbstract implements RemotingService { } @Override - public ProtocolFactory protocolFactory() { - return this.protocolFactory; - } - - @Override public SerializerFactory serializerFactory() { return this.serializerFactory; } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java index 7481574..faead7f 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java @@ -33,13 +33,6 @@ import io.netty.channel.epoll.EpollSocketChannel; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.handler.codec.http2.Http2SecurityUtil; -import io.netty.handler.ssl.OpenSsl; -import io.netty.handler.ssl.SslContext; -import io.netty.handler.ssl.SslContextBuilder; -import io.netty.handler.ssl.SslProvider; -import io.netty.handler.ssl.SupportedCipherSuiteFilter; -import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; @@ -52,21 +45,17 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import javax.net.ssl.SSLException; import org.apache.rocketmq.remoting.api.AsyncHandler; import org.apache.rocketmq.remoting.api.RemotingClient; import org.apache.rocketmq.remoting.api.command.RemotingCommand; import org.apache.rocketmq.remoting.api.command.TrafficType; import org.apache.rocketmq.remoting.api.exception.RemoteConnectFailureException; import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException; -import org.apache.rocketmq.remoting.api.protocol.Protocol; -import org.apache.rocketmq.remoting.common.RemotingCommandFactoryMeta; import org.apache.rocketmq.remoting.config.RemotingConfig; import org.apache.rocketmq.remoting.external.ThreadUtils; import org.apache.rocketmq.remoting.impl.netty.handler.Decoder; import org.apache.rocketmq.remoting.impl.netty.handler.Encoder; import org.apache.rocketmq.remoting.impl.netty.handler.ExceptionHandler; -import org.apache.rocketmq.remoting.impl.netty.handler.Http2Handler; import org.apache.rocketmq.remoting.internal.JvmUtils; public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient { @@ -80,10 +69,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti private final ConcurrentHashMap<String, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>(); private final Lock lockChannelTables = new ReentrantLock(); private EventExecutorGroup workerGroup; - private SslContext sslContext; NettyRemotingClient(final RemotingConfig clientConfig) { - super(clientConfig, new RemotingCommandFactoryMeta(clientConfig.getProtocolName(), clientConfig.getSerializerName())); + super(clientConfig); this.clientConfig = clientConfig; if (JvmUtils.isLinux() && this.clientConfig.isClientNativeEpollEnable()) { @@ -98,10 +86,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti this.workerGroup = new DefaultEventExecutorGroup(clientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", clientConfig.getClientWorkerThreads())); - - if (Protocol.HTTP2.equals(clientConfig.getProtocolName())) { - buildSslContext(); - } } private void applyOptions(Bootstrap bootstrap) { @@ -134,9 +118,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { - if (Protocol.HTTP2.equals(clientConfig.getProtocolName())) { - ch.pipeline().addFirst(sslContext.newHandler(ch.alloc()), Http2Handler.newHandler(false)); - } ch.pipeline().addLast(workerGroup, new Decoder(), new Encoder(), new IdleStateHandler(clientConfig.getConnectionChannelReaderIdleSeconds(), clientConfig.getConnectionChannelWriterIdleSeconds(), clientConfig.getConnectionChannelIdleSeconds()), new ClientConnectionHandler(), new EventDispatcher(), new ExceptionHandler()); @@ -148,21 +129,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti startUpHouseKeepingService(); } - private void buildSslContext() { - SslProvider provider = OpenSsl.isAvailable() ? SslProvider.OPENSSL : SslProvider.JDK; - try { - sslContext = SslContextBuilder.forClient() - .sslProvider(provider) - /* NOTE: the cipher filter may not include all ciphers required by the HTTP/2 specification. - * Please refer to the HTTP/2 specification for cipher requirements. */ - .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE) - .trustManager(InsecureTrustManagerFactory.INSTANCE) - .build(); - } catch (SSLException e) { - e.printStackTrace(); - } - } - @Override public void stop() { // try { diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java index d875f95..0d6a2cc 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java @@ -35,13 +35,6 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.codec.http2.Http2SecurityUtil; -import io.netty.handler.ssl.OpenSsl; -import io.netty.handler.ssl.SslContext; -import io.netty.handler.ssl.SslContextBuilder; -import io.netty.handler.ssl.SslProvider; -import io.netty.handler.ssl.SupportedCipherSuiteFilter; -import io.netty.handler.ssl.util.SelfSignedCertificate; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; @@ -58,7 +51,8 @@ import org.apache.rocketmq.remoting.config.RemotingConfig; import org.apache.rocketmq.remoting.external.ThreadUtils; import org.apache.rocketmq.remoting.impl.channel.NettyChannelImpl; import org.apache.rocketmq.remoting.impl.netty.handler.ChannelStatistics; -import org.apache.rocketmq.remoting.impl.netty.handler.ProtocolSelector; +import org.apache.rocketmq.remoting.impl.netty.handler.Decoder; +import org.apache.rocketmq.remoting.impl.netty.handler.Encoder; import org.apache.rocketmq.remoting.internal.JvmUtils; public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer { @@ -71,7 +65,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti private Class<? extends ServerSocketChannel> socketChannelClass; private int port; - private SslContext sslContext; NettyRemotingServer(final RemotingConfig serverConfig) { super(serverConfig); @@ -99,23 +92,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti this.workerGroup = new DefaultEventExecutorGroup(serverConfig.getServerWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyWorkerThreads", serverConfig.getServerWorkerThreads())); - - buildHttp2SslContext(); - } - - private void buildHttp2SslContext() { - try { - SslProvider provider = OpenSsl.isAvailable() ? SslProvider.OPENSSL : SslProvider.JDK; - SelfSignedCertificate ssc; - //NOTE: the cipher filter may not include all ciphers required by the HTTP/2 specification. - //Please refer to the HTTP/2 specification for cipher requirements. - ssc = new SelfSignedCertificate(); - sslContext = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()) - .sslProvider(provider) - .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE).build(); - } catch (Exception e) { - LOG.error("Can not build SSL context !", e); - } } private void applyOptions(ServerBootstrap bootstrap) { @@ -162,9 +138,10 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti ChannelPipeline cp = ch.pipeline(); cp.addLast(ChannelStatistics.NAME, new ChannelStatistics(channels)); - - cp.addFirst(ProtocolSelector.NAME, new ProtocolSelector(sslContext)); - cp.addLast(workerGroup, new IdleStateHandler(serverConfig.getConnectionChannelReaderIdleSeconds(), + cp.addLast(workerGroup, + new Encoder(), + new Decoder(), + new IdleStateHandler(serverConfig.getConnectionChannelReaderIdleSeconds(), serverConfig.getConnectionChannelWriterIdleSeconds(), serverConfig.getConnectionChannelIdleSeconds()), new ServerConnectionHandler(), diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Http2Handler.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Http2Handler.java deleted file mode 100644 index 7cdb976..0000000 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Http2Handler.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting.impl.netty.handler; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPromise; -import io.netty.handler.codec.http2.DefaultHttp2Connection; -import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder; -import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder; -import io.netty.handler.codec.http2.DefaultHttp2FrameReader; -import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; -import io.netty.handler.codec.http2.DefaultHttp2Headers; -import io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder; -import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController; -import io.netty.handler.codec.http2.Http2Connection; -import io.netty.handler.codec.http2.Http2ConnectionDecoder; -import io.netty.handler.codec.http2.Http2ConnectionEncoder; -import io.netty.handler.codec.http2.Http2ConnectionHandler; -import io.netty.handler.codec.http2.Http2Exception; -import io.netty.handler.codec.http2.Http2FrameAdapter; -import io.netty.handler.codec.http2.Http2FrameReader; -import io.netty.handler.codec.http2.Http2FrameWriter; -import io.netty.handler.codec.http2.Http2Headers; -import io.netty.handler.codec.http2.Http2HeadersDecoder; -import io.netty.handler.codec.http2.Http2Settings; -import io.netty.handler.codec.http2.StreamBufferingEncoder; - -import static io.netty.handler.codec.http.HttpResponseStatus.OK; -import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO; - -public class Http2Handler extends Http2ConnectionHandler { - - private boolean isServer; - private int lastStreamId; - - private Http2Handler(final Http2ConnectionDecoder decoder, final Http2ConnectionEncoder encoder, - final Http2Settings initialSettings, final boolean isServer) { - super(decoder, encoder, initialSettings); - decoder.frameListener(new FrameListener()); - this.isServer = isServer; - } - - public static Http2Handler newHandler(final boolean isServer) { - - Http2HeadersDecoder headersDecoder = new DefaultHttp2HeadersDecoder(true); - Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder); - Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter(); - - Http2Connection connection = new DefaultHttp2Connection(isServer); - - Http2ConnectionEncoder encoder = new StreamBufferingEncoder( - new DefaultHttp2ConnectionEncoder(connection, frameWriter)); - - connection.local().flowController(new DefaultHttp2LocalFlowController(connection, - DEFAULT_WINDOW_UPDATE_RATIO, true)); - - Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, - frameReader); - - Http2Settings settings = new Http2Settings(); - - if (!isServer) - settings.pushEnabled(true); - - settings.initialWindowSize(1048576 * 10); //10MiB - settings.maxConcurrentStreams(Integer.MAX_VALUE); - - return newHandler(decoder, encoder, settings, isServer); - } - - private static Http2Handler newHandler(final Http2ConnectionDecoder decoder, final Http2ConnectionEncoder encoder, - final Http2Settings settings, boolean isServer) { - return new Http2Handler(decoder, encoder, settings, isServer); - } - - @Override - public void write(final ChannelHandlerContext ctx, final Object msg, - final ChannelPromise promise) throws Exception { - if (isServer) { - assert msg instanceof ByteBuf; - sendAPushPromise(ctx, lastStreamId, lastStreamId + 1, (ByteBuf) msg); - } else { - - final Http2Headers headers = new DefaultHttp2Headers(); - - try { - long threadId = Thread.currentThread().getId(); - long streamId = (threadId % 2 == 0) ? threadId + 1 : threadId + 2; - encoder().writeHeaders(ctx, (int) streamId, headers, 0, false, promise); - encoder().writeData(ctx, (int) streamId, (ByteBuf) msg, 0, false, ctx.newPromise()); - ctx.flush(); - } catch (Exception e) { - e.printStackTrace(); - } - - } - } - - private void sendAPushPromise(ChannelHandlerContext ctx, int streamId, int pushPromiseStreamId, - ByteBuf payload) throws Http2Exception { - - encoder().writePushPromise(ctx, streamId, pushPromiseStreamId, - new DefaultHttp2Headers().status(OK.codeAsText()), 0, ctx.newPromise()); - - //Http2Stream stream = connection.local().reservePushStream(pushPromiseStreamId, connection.connectionStream()); - Http2Headers headers = new DefaultHttp2Headers(); - headers.status(OK.codeAsText()); - encoder().writeHeaders(ctx, pushPromiseStreamId, headers, 0, false, ctx.newPromise()); - encoder().writeData(ctx, pushPromiseStreamId, payload, 0, false, ctx.newPromise()); - } - - private class FrameListener extends Http2FrameAdapter { - @Override - public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, - boolean endOfStream) throws Http2Exception { - //Http2Handler.this.onDataRead(ctx, streamId, data, endOfStream); - data.retain(); - Http2Handler.this.lastStreamId = streamId; - ctx.fireChannelRead(data); - return data.readableBytes() + padding; - } - } -} diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ProtocolSelector.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ProtocolSelector.java deleted file mode 100644 index e00a213..0000000 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/ProtocolSelector.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting.impl.netty.handler; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.ssl.SslContext; -import org.apache.rocketmq.remoting.api.channel.ChannelHandlerContextWrapper; -import org.apache.rocketmq.remoting.api.protocol.Protocol; -import org.apache.rocketmq.remoting.api.protocol.ProtocolFactory; -import org.apache.rocketmq.remoting.impl.channel.ChannelHandlerContextWrapperImpl; -import org.apache.rocketmq.remoting.impl.protocol.ProtocolFactoryImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ProtocolSelector extends SimpleChannelInboundHandler<ByteBuf> { - public static final String NAME = ProtocolSelector.class.getSimpleName(); - private static final Logger LOG = LoggerFactory.getLogger(ProtocolSelector.class); - private ProtocolFactory protocolFactory; - - public ProtocolSelector(final SslContext sslContext) { - this.protocolFactory = new ProtocolFactoryImpl(sslContext); - } - - @Override - protected void channelRead0(final ChannelHandlerContext ctx, final ByteBuf msg) throws Exception { - if (msg.readableBytes() < 1) { - return; - } - msg.markReaderIndex(); - Protocol protocol = protocolFactory.get(msg.readByte()); - if (protocol == null) { - ctx.channel().close().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - LOG.warn("Close channel {},result is {}", ctx.channel(), future.isSuccess()); - } - }); - return; - } - ChannelHandlerContextWrapper chcw = new ChannelHandlerContextWrapperImpl(ctx); - protocol.assembleHandler(chcw); - msg.resetReaderIndex(); - ctx.pipeline().remove(this); - ctx.fireChannelRead(msg.retain()); - } -} diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/Httpv2Protocol.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/Httpv2Protocol.java deleted file mode 100644 index 1dc371d..0000000 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/Httpv2Protocol.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting.impl.protocol; - -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.ssl.SslContext; -import org.apache.rocketmq.remoting.api.channel.ChannelHandlerContextWrapper; -import org.apache.rocketmq.remoting.impl.netty.handler.Http2Handler; -import org.apache.rocketmq.remoting.impl.netty.handler.ProtocolSelector; - -public class Httpv2Protocol extends RemotingCoreProtocol { - private SslContext sslContext; - - public Httpv2Protocol(final SslContext sslContext) { - this.sslContext = sslContext; - } - - @Override - public String name() { - return HTTP2; - } - - @Override - public byte type() { - return HTTP_2_MAGIC; - } - - @Override - public void assembleHandler(final ChannelHandlerContextWrapper ctx) { - super.assembleHandler(ctx); - ChannelHandlerContext chx = (ChannelHandlerContext) ctx.getContext(); - - chx.pipeline().addAfter(ProtocolSelector.NAME, "sslHandler", sslContext.newHandler(chx.alloc())); - chx.pipeline().addAfter("sslHandler", "http2Handler", Http2Handler.newHandler(true)); - } - -} diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/ProtocolFactoryImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/ProtocolFactoryImpl.java deleted file mode 100644 index 15322be..0000000 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/ProtocolFactoryImpl.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting.impl.protocol; - -import io.netty.handler.ssl.SslContext; -import org.apache.rocketmq.remoting.api.protocol.Protocol; -import org.apache.rocketmq.remoting.api.protocol.ProtocolFactory; - -public class ProtocolFactoryImpl implements ProtocolFactory { - private static final int MAX_COUNT = 0x0FF; - private final Protocol[] tables = new Protocol[MAX_COUNT]; - - private SslContext sslContext; - - public ProtocolFactoryImpl(final SslContext sslContext) { - this.sslContext = sslContext; - this.register(new RemotingCoreProtocol()); - this.register(new Httpv2Protocol(sslContext)); - this.register(new WebSocketProtocol()); - } - - public ProtocolFactoryImpl() { - this.register(new RemotingCoreProtocol()); - this.register(new Httpv2Protocol(sslContext)); - this.register(new WebSocketProtocol()); - } - - @Override - public void register(Protocol protocol) { - if (tables[protocol.type() & MAX_COUNT] != null) { - throw new RuntimeException("protocol header's sign is overlapped"); - } - tables[protocol.type() & MAX_COUNT] = protocol; - } - - @Override - public void resetAll(final Protocol protocol) { - for (int i = 0; i < MAX_COUNT; i++) { - tables[i] = protocol; - } - } - - @Override - public byte type(final String protocolName) { - - for (int i = 0; i < this.tables.length; i++) { - if (this.tables[i] != null) { - if (this.tables[i].name().equalsIgnoreCase(protocolName)) { - return this.tables[i].type(); - } - } - } - - throw new IllegalArgumentException(String.format("the protocol: %s not exist", protocolName)); - } - - @Override - public Protocol get(byte type) { - return tables[type & MAX_COUNT]; - } - - @Override - public void clearAll() { - for (int i = 0; i < this.tables.length; i++) { - this.tables[i] = null; - } - } -} diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/RemotingCoreProtocol.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/RemotingCoreProtocol.java deleted file mode 100644 index 317b24f..0000000 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/RemotingCoreProtocol.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting.impl.protocol; - -import io.netty.channel.ChannelHandlerContext; -import org.apache.rocketmq.remoting.api.channel.ChannelHandlerContextWrapper; -import org.apache.rocketmq.remoting.api.protocol.Protocol; -import org.apache.rocketmq.remoting.impl.netty.handler.Decoder; -import org.apache.rocketmq.remoting.impl.netty.handler.Encoder; -import org.apache.rocketmq.remoting.impl.netty.handler.ProtocolSelector; - -public class RemotingCoreProtocol implements Protocol { - @Override - public String name() { - return MVP; - } - - @Override - public byte type() { - return MVP_MAGIC; - } - - @Override - public void assembleHandler(final ChannelHandlerContextWrapper ctx) { - - ChannelHandlerContext chx = (ChannelHandlerContext) ctx.getContext(); - - chx.pipeline().addAfter(ProtocolSelector.NAME, "decoder", new Decoder()); - chx.pipeline().addAfter("decoder", "encoder", new Encoder()); - } -} diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/WebSocketProtocol.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/WebSocketProtocol.java deleted file mode 100644 index 18a3a11..0000000 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/protocol/WebSocketProtocol.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting.impl.protocol; - -import org.apache.rocketmq.remoting.api.channel.ChannelHandlerContextWrapper; -import org.apache.rocketmq.remoting.api.protocol.Protocol; - -public class WebSocketProtocol implements Protocol { - @Override - public String name() { - return WEBSOCKET; - } - - @Override - public byte type() { - return WEBSOCKET_MAGIC; - } - - @Override - public void assembleHandler(final ChannelHandlerContextWrapper ctx) { - - } -}
