This is an automated email from the ASF dual-hosted git repository. yukon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-remoting.git
commit 7eeca295876cb0ee9fb82da6322234c9d18793de Author: yukon <[email protected]> AuthorDate: Wed Jun 5 11:54:36 2019 +0800 Fix some bugs and polish the netty transport implementation --- benchmarks/remoting-benchmark/pom.xml | 4 +- .../benchmarks/remoting/AbstractBenchmark.java | 54 ++-- .../remoting/RemotingBootstrapFactory.java | 19 +- .../rocketmq/remoting/common/ResponseFuture.java | 1 - .../remoting/config/RemotingClientConfig.java | 117 +++++++++ .../rocketmq/remoting/config/RemotingConfig.java | 281 ++------------------- .../remoting/config/RemotingServerConfig.java | 108 ++++++++ .../rocketmq/remoting/config/TcpSocketConfig.java | 24 +- .../remoting/impl/command/CodecHelper.java | 4 +- .../remoting/impl/netty/ClientChannelManager.java | 10 +- .../remoting/impl/netty/NettyChannelEventType.java | 4 +- .../remoting/impl/netty/NettyRemotingAbstract.java | 27 +- .../remoting/impl/netty/NettyRemotingClient.java | 40 +-- .../remoting/impl/netty/NettyRemotingServer.java | 35 ++- .../org/apache/rocketmq/remoting/BaseTest.java | 3 +- .../common/SemaphoreReleaseOnlyOnceTest.java | 2 +- .../command/RemotingCommandFactoryImplTest.java | 5 +- .../impl/command/RequestIdGeneratorTest.java | 2 +- .../impl/netty/ClientChannelManagerTest.java | 4 +- .../remoting/impl/netty/handler/EncoderTest.java | 1 - 20 files changed, 358 insertions(+), 387 deletions(-) diff --git a/benchmarks/remoting-benchmark/pom.xml b/benchmarks/remoting-benchmark/pom.xml index e103ae8..a18838d 100644 --- a/benchmarks/remoting-benchmark/pom.xml +++ b/benchmarks/remoting-benchmark/pom.xml @@ -1,6 +1,6 @@ <?xml version="1.0" encoding="UTF-8"?> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>rocketmq-x</artifactId> diff --git a/benchmarks/remoting-benchmark/src/main/java/org/apache/rocketmq/benchmarks/remoting/AbstractBenchmark.java b/benchmarks/remoting-benchmark/src/main/java/org/apache/rocketmq/benchmarks/remoting/AbstractBenchmark.java index b3754e9..04ca3cf 100644 --- a/benchmarks/remoting-benchmark/src/main/java/org/apache/rocketmq/benchmarks/remoting/AbstractBenchmark.java +++ b/benchmarks/remoting-benchmark/src/main/java/org/apache/rocketmq/benchmarks/remoting/AbstractBenchmark.java @@ -21,38 +21,16 @@ import org.apache.rocketmq.remoting.RemotingBootstrapFactory; import org.apache.rocketmq.remoting.api.RemotingClient; import org.apache.rocketmq.remoting.api.RemotingServer; import org.apache.rocketmq.remoting.api.command.RemotingCommand; -import org.apache.rocketmq.remoting.config.RemotingConfig; +import org.apache.rocketmq.remoting.config.RemotingClientConfig; +import org.apache.rocketmq.remoting.config.RemotingServerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class AbstractBenchmark { protected static final Logger LOG = LoggerFactory.getLogger(AbstractBenchmark.class); - /** - * Standard message sizes. - */ - public enum MessageSize { - SMALL(16), MEDIUM(1024), LARGE(65536), JUMBO(1048576); - - private final int bytes; - MessageSize(int bytes) { - this.bytes = bytes; - } - - public int bytes() { - return bytes; - } - } - - /** - * Support channel types. - */ - public enum ChannelType { - NIO, LOCAL; - } - public static void main(String[] args) throws InterruptedException { - RemotingServer server = RemotingBootstrapFactory.createRemotingServer(new RemotingConfig()); + RemotingServer server = RemotingBootstrapFactory.createRemotingServer(new RemotingServerConfig()); server.registerRequestProcessor((short) 1, (channel, request) -> { RemotingCommand response = server.commandFactory().createResponse(request); @@ -62,7 +40,7 @@ public class AbstractBenchmark { }); server.start(); - RemotingClient client = RemotingBootstrapFactory.createRemotingClient(new RemotingConfig()); + RemotingClient client = RemotingBootstrapFactory.createRemotingClient(new RemotingClientConfig()); client.start(); RemotingCommand request = client.commandFactory().createRequest(); @@ -75,4 +53,28 @@ public class AbstractBenchmark { client.stop(); server.stop(); } + + /** + * Standard message sizes. + */ + public enum MessageSize { + SMALL(16), MEDIUM(1024), LARGE(65536), JUMBO(1048576); + + private final int bytes; + + MessageSize(int bytes) { + this.bytes = bytes; + } + + public int bytes() { + return bytes; + } + } + + /** + * Support channel types. + */ + public enum ChannelType { + NIO, LOCAL; + } } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/RemotingBootstrapFactory.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/RemotingBootstrapFactory.java index efa4078..84ae102 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/RemotingBootstrapFactory.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/RemotingBootstrapFactory.java @@ -19,7 +19,8 @@ package org.apache.rocketmq.remoting; import java.util.Properties; import org.apache.rocketmq.remoting.api.RemotingClient; -import org.apache.rocketmq.remoting.config.RemotingConfig; +import org.apache.rocketmq.remoting.config.RemotingClientConfig; +import org.apache.rocketmq.remoting.config.RemotingServerConfig; import org.apache.rocketmq.remoting.impl.netty.NettyRemotingClient; import org.apache.rocketmq.remoting.impl.netty.NettyRemotingServer; import org.apache.rocketmq.remoting.internal.BeanUtils; @@ -30,33 +31,33 @@ import org.jetbrains.annotations.NotNull; * Remoting Bootstrap entrance. */ public final class RemotingBootstrapFactory { - public static RemotingClient createRemotingClient(@NotNull final String fileName) { - Properties prop = PropertyUtils.loadProps(fileName); - RemotingConfig config = BeanUtils.populate(prop, RemotingConfig.class); + public static RemotingClient createRemotingClient(@NotNull final RemotingClientConfig config) { return new NettyRemotingClient(config); } - public static RemotingClient createRemotingClient(@NotNull final RemotingConfig config) { + public static RemotingClient createRemotingClient(@NotNull final String fileName) { + Properties prop = PropertyUtils.loadProps(fileName); + RemotingClientConfig config = BeanUtils.populate(prop, RemotingClientConfig.class); return new NettyRemotingClient(config); } public static RemotingClient createRemotingClient(@NotNull final Properties properties) { - RemotingConfig config = BeanUtils.populate(properties, RemotingConfig.class); + RemotingClientConfig config = BeanUtils.populate(properties, RemotingClientConfig.class); return new NettyRemotingClient(config); } public static NettyRemotingServer createRemotingServer(@NotNull final String fileName) { Properties prop = PropertyUtils.loadProps(fileName); - RemotingConfig config = BeanUtils.populate(prop, RemotingConfig.class); + RemotingServerConfig config = BeanUtils.populate(prop, RemotingServerConfig.class); return new NettyRemotingServer(config); } public static NettyRemotingServer createRemotingServer(@NotNull final Properties properties) { - RemotingConfig config = BeanUtils.populate(properties, RemotingConfig.class); + RemotingServerConfig config = BeanUtils.populate(properties, RemotingServerConfig.class); return new NettyRemotingServer(config); } - public static NettyRemotingServer createRemotingServer(@NotNull final RemotingConfig config) { + public static NettyRemotingServer createRemotingServer(@NotNull final RemotingServerConfig config) { return new NettyRemotingServer(config); } } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java index 8a2aec2..e6c394b 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java @@ -24,7 +24,6 @@ import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.rocketmq.remoting.api.AsyncHandler; import org.apache.rocketmq.remoting.api.command.RemotingCommand; -import org.apache.rocketmq.remoting.api.interceptor.InterceptorGroup; import org.jetbrains.annotations.Nullable; public class ResponseFuture { diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingClientConfig.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingClientConfig.java new file mode 100644 index 0000000..8d77388 --- /dev/null +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingClientConfig.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.config; + +public class RemotingClientConfig extends RemotingConfig { + private int connectTimeoutMillis = 3000; + + private boolean clientNativeEpollEnable = false; + + private int clientIoThreads = 1; + private int clientWorkerThreads = 4; + + private int clientOnewayInvokeSemaphore = 65535; + private int clientAsyncInvokeSemaphore = 65535; + + private boolean clientPooledBytebufAllocatorEnable = false; + + private boolean clientCloseSocketIfTimeout = false; + private boolean clientShortConnectionEnable = false; + + public boolean isClientNativeEpollEnable() { + return clientNativeEpollEnable; + } + + public void setClientNativeEpollEnable(final boolean clientNativeEpollEnable) { + this.clientNativeEpollEnable = clientNativeEpollEnable; + } + + public int getClientIoThreads() { + return clientIoThreads; + } + + public void setClientIoThreads(final int clientIoThreads) { + this.clientIoThreads = clientIoThreads; + } + + public int getClientWorkerThreads() { + return clientWorkerThreads; + } + + public void setClientWorkerThreads(final int clientWorkerThreads) { + this.clientWorkerThreads = clientWorkerThreads; + } + + public int getClientOnewayInvokeSemaphore() { + return clientOnewayInvokeSemaphore; + } + + public void setClientOnewayInvokeSemaphore(final int clientOnewayInvokeSemaphore) { + this.clientOnewayInvokeSemaphore = clientOnewayInvokeSemaphore; + } + + public int getClientAsyncInvokeSemaphore() { + return clientAsyncInvokeSemaphore; + } + + public void setClientAsyncInvokeSemaphore(final int clientAsyncInvokeSemaphore) { + this.clientAsyncInvokeSemaphore = clientAsyncInvokeSemaphore; + } + + public boolean isClientPooledBytebufAllocatorEnable() { + return clientPooledBytebufAllocatorEnable; + } + + public void setClientPooledBytebufAllocatorEnable(final boolean clientPooledBytebufAllocatorEnable) { + this.clientPooledBytebufAllocatorEnable = clientPooledBytebufAllocatorEnable; + } + + public boolean isClientCloseSocketIfTimeout() { + return clientCloseSocketIfTimeout; + } + + public void setClientCloseSocketIfTimeout(final boolean clientCloseSocketIfTimeout) { + this.clientCloseSocketIfTimeout = clientCloseSocketIfTimeout; + } + + public boolean isClientShortConnectionEnable() { + return clientShortConnectionEnable; + } + + public void setClientShortConnectionEnable(final boolean clientShortConnectionEnable) { + this.clientShortConnectionEnable = clientShortConnectionEnable; + } + + public int getConnectTimeoutMillis() { + return connectTimeoutMillis; + } + + public void setConnectTimeoutMillis(final int connectTimeoutMillis) { + this.connectTimeoutMillis = connectTimeoutMillis; + } + + @Override + public int getOnewayInvokeSemaphore() { + return this.clientOnewayInvokeSemaphore; + } + + @Override + public int getAsyncInvokeSemaphore() { + return this.clientAsyncInvokeSemaphore; + } +} diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java index 9fa79c2..d6f636b 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java @@ -17,67 +17,26 @@ package org.apache.rocketmq.remoting.config; -import org.apache.commons.lang3.builder.ToStringBuilder; -import org.apache.commons.lang3.builder.ToStringStyle; - -public class RemotingConfig extends TcpSocketConfig { - private int connectionMaxRetries = 3; - private int connectionChannelReaderIdleSeconds = 0; - private int connectionChannelWriterIdleSeconds = 0; +public abstract class RemotingConfig extends TcpSocketConfig { /** * IdleStateEvent will be triggered when neither read nor write was * performed for the specified period of this time. Specify {@code 0} to * disable */ + private int connectionChannelReaderIdleSeconds = 0; + private int connectionChannelWriterIdleSeconds = 0; private int connectionChannelIdleSeconds = 120; + private int writeBufLowWaterMark = 32 * 10240; private int writeBufHighWaterMark = 64 * 10240; - private int threadTaskLowWaterMark = 30000; - private int threadTaskHighWaterMark = 50000; - private int connectionRetryBackoffMillis = 3000; - private int serviceThreadBlockQueueSize = 50000; - private boolean clientNativeEpollEnable = false; - private int clientWorkerThreads = 16 + Runtime.getRuntime().availableProcessors() * 2; - private int clientConnectionFutureAwaitTimeoutMillis = 3000; - private int clientAsyncCallbackExecutorThreads = 16 + Runtime.getRuntime().availableProcessors() * 2; - private int clientOnewayInvokeSemaphore = 20480; - //=============Server configuration================== - private int clientAsyncInvokeSemaphore = 20480; - private boolean clientPooledBytebufAllocatorEnable = false; - private boolean clientCloseSocketIfTimeout = true; - private boolean clientShortConnectionEnable = false; - private long clientPublishServiceTimeout = 10000; - private long clientConsumerServiceTimeout = 10000; - private long clientInvokeServiceTimeout = 10000; - private int clientMaxRetryCount = 10; - private int clientSleepBeforeRetry = 100; - private int serverListenPort = 8888; - /** - * If server only listened 1 port,recommend to set the value to 1 - */ - private int serverAcceptorThreads = 1; - private int serverIoThreads = 16 + Runtime.getRuntime().availableProcessors() * 2; - private int serverWorkerThreads = 16 + Runtime.getRuntime().availableProcessors() * 2; - private int serverOnewayInvokeSemaphore = 256; - private int serverAsyncInvokeSemaphore = 6400; - private boolean serverNativeEpollEnable = false; - private int serverAsyncCallbackExecutorThreads = Runtime.getRuntime().availableProcessors() * 2; - private boolean serverPooledBytebufAllocatorEnable = true; - private boolean serverAuthOpenEnable = true; + private int asyncHandlerExecutorThreads = Runtime.getRuntime().availableProcessors(); - @Override - public String toString() { - return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE); - } + private int publicExecutorThreads = 4; - public int getConnectionMaxRetries() { - return connectionMaxRetries; - } + public abstract int getOnewayInvokeSemaphore(); - public void setConnectionMaxRetries(final int connectionMaxRetries) { - this.connectionMaxRetries = connectionMaxRetries; - } + public abstract int getAsyncInvokeSemaphore(); public int getConnectionChannelReaderIdleSeconds() { return connectionChannelReaderIdleSeconds; @@ -119,227 +78,19 @@ public class RemotingConfig extends TcpSocketConfig { this.writeBufHighWaterMark = writeBufHighWaterMark; } - public int getThreadTaskLowWaterMark() { - return threadTaskLowWaterMark; - } - - public void setThreadTaskLowWaterMark(final int threadTaskLowWaterMark) { - this.threadTaskLowWaterMark = threadTaskLowWaterMark; - } - - public int getThreadTaskHighWaterMark() { - return threadTaskHighWaterMark; - } - - public void setThreadTaskHighWaterMark(final int threadTaskHighWaterMark) { - this.threadTaskHighWaterMark = threadTaskHighWaterMark; - } - - public int getConnectionRetryBackoffMillis() { - return connectionRetryBackoffMillis; - } - - public void setConnectionRetryBackoffMillis(final int connectionRetryBackoffMillis) { - this.connectionRetryBackoffMillis = connectionRetryBackoffMillis; - } - - public int getServiceThreadBlockQueueSize() { - return serviceThreadBlockQueueSize; - } - - public void setServiceThreadBlockQueueSize(final int serviceThreadBlockQueueSize) { - this.serviceThreadBlockQueueSize = serviceThreadBlockQueueSize; - } - - public boolean isClientNativeEpollEnable() { - return clientNativeEpollEnable; - } - - public void setClientNativeEpollEnable(final boolean clientNativeEpollEnable) { - this.clientNativeEpollEnable = clientNativeEpollEnable; - } - - public int getClientWorkerThreads() { - return clientWorkerThreads; - } - - public void setClientWorkerThreads(final int clientWorkerThreads) { - this.clientWorkerThreads = clientWorkerThreads; - } - - public int getClientConnectionFutureAwaitTimeoutMillis() { - return clientConnectionFutureAwaitTimeoutMillis; - } - - public void setClientConnectionFutureAwaitTimeoutMillis(final int clientConnectionFutureAwaitTimeoutMillis) { - this.clientConnectionFutureAwaitTimeoutMillis = clientConnectionFutureAwaitTimeoutMillis; - } - - public int getClientAsyncCallbackExecutorThreads() { - return clientAsyncCallbackExecutorThreads; - } - - public void setClientAsyncCallbackExecutorThreads(final int clientAsyncCallbackExecutorThreads) { - this.clientAsyncCallbackExecutorThreads = clientAsyncCallbackExecutorThreads; - } - - public int getClientOnewayInvokeSemaphore() { - return clientOnewayInvokeSemaphore; - } - - public void setClientOnewayInvokeSemaphore(final int clientOnewayInvokeSemaphore) { - this.clientOnewayInvokeSemaphore = clientOnewayInvokeSemaphore; - } - - public int getClientAsyncInvokeSemaphore() { - return clientAsyncInvokeSemaphore; - } - - public void setClientAsyncInvokeSemaphore(final int clientAsyncInvokeSemaphore) { - this.clientAsyncInvokeSemaphore = clientAsyncInvokeSemaphore; - } - - public boolean isClientPooledBytebufAllocatorEnable() { - return clientPooledBytebufAllocatorEnable; - } - - public void setClientPooledBytebufAllocatorEnable(final boolean clientPooledBytebufAllocatorEnable) { - this.clientPooledBytebufAllocatorEnable = clientPooledBytebufAllocatorEnable; - } - - public boolean isClientCloseSocketIfTimeout() { - return clientCloseSocketIfTimeout; - } - - public void setClientCloseSocketIfTimeout(final boolean clientCloseSocketIfTimeout) { - this.clientCloseSocketIfTimeout = clientCloseSocketIfTimeout; - } - - public boolean isClientShortConnectionEnable() { - return clientShortConnectionEnable; - } - - public void setClientShortConnectionEnable(final boolean clientShortConnectionEnable) { - this.clientShortConnectionEnable = clientShortConnectionEnable; - } - - public long getClientPublishServiceTimeout() { - return clientPublishServiceTimeout; - } - - public void setClientPublishServiceTimeout(final long clientPublishServiceTimeout) { - this.clientPublishServiceTimeout = clientPublishServiceTimeout; - } - - public long getClientConsumerServiceTimeout() { - return clientConsumerServiceTimeout; - } - - public void setClientConsumerServiceTimeout(final long clientConsumerServiceTimeout) { - this.clientConsumerServiceTimeout = clientConsumerServiceTimeout; - } - - public long getClientInvokeServiceTimeout() { - return clientInvokeServiceTimeout; - } - - public void setClientInvokeServiceTimeout(final long clientInvokeServiceTimeout) { - this.clientInvokeServiceTimeout = clientInvokeServiceTimeout; - } - - public int getClientMaxRetryCount() { - return clientMaxRetryCount; - } - - public void setClientMaxRetryCount(final int clientMaxRetryCount) { - this.clientMaxRetryCount = clientMaxRetryCount; - } - - public int getClientSleepBeforeRetry() { - return clientSleepBeforeRetry; - } - - public void setClientSleepBeforeRetry(final int clientSleepBeforeRetry) { - this.clientSleepBeforeRetry = clientSleepBeforeRetry; - } - - public int getServerListenPort() { - return serverListenPort; - } - - public void setServerListenPort(final int serverListenPort) { - this.serverListenPort = serverListenPort; - } - - public int getServerAcceptorThreads() { - return serverAcceptorThreads; - } - - public void setServerAcceptorThreads(final int serverAcceptorThreads) { - this.serverAcceptorThreads = serverAcceptorThreads; - } - - public int getServerIoThreads() { - return serverIoThreads; - } - - public void setServerIoThreads(final int serverIoThreads) { - this.serverIoThreads = serverIoThreads; - } - - public int getServerWorkerThreads() { - return serverWorkerThreads; - } - - public void setServerWorkerThreads(final int serverWorkerThreads) { - this.serverWorkerThreads = serverWorkerThreads; - } - - public int getServerOnewayInvokeSemaphore() { - return serverOnewayInvokeSemaphore; - } - - public void setServerOnewayInvokeSemaphore(final int serverOnewayInvokeSemaphore) { - this.serverOnewayInvokeSemaphore = serverOnewayInvokeSemaphore; - } - - public int getServerAsyncInvokeSemaphore() { - return serverAsyncInvokeSemaphore; - } - - public void setServerAsyncInvokeSemaphore(final int serverAsyncInvokeSemaphore) { - this.serverAsyncInvokeSemaphore = serverAsyncInvokeSemaphore; - } - - public boolean isServerNativeEpollEnable() { - return serverNativeEpollEnable; - } - - public void setServerNativeEpollEnable(final boolean serverNativeEpollEnable) { - this.serverNativeEpollEnable = serverNativeEpollEnable; - } - - public int getServerAsyncCallbackExecutorThreads() { - return serverAsyncCallbackExecutorThreads; - } - - public void setServerAsyncCallbackExecutorThreads(final int serverAsyncCallbackExecutorThreads) { - this.serverAsyncCallbackExecutorThreads = serverAsyncCallbackExecutorThreads; - } - - public boolean isServerPooledBytebufAllocatorEnable() { - return serverPooledBytebufAllocatorEnable; + public int getAsyncHandlerExecutorThreads() { + return asyncHandlerExecutorThreads; } - public void setServerPooledBytebufAllocatorEnable(final boolean serverPooledBytebufAllocatorEnable) { - this.serverPooledBytebufAllocatorEnable = serverPooledBytebufAllocatorEnable; + public void setAsyncHandlerExecutorThreads(final int asyncHandlerExecutorThreads) { + this.asyncHandlerExecutorThreads = asyncHandlerExecutorThreads; } - public boolean isServerAuthOpenEnable() { - return serverAuthOpenEnable; + public int getPublicExecutorThreads() { + return publicExecutorThreads; } - public void setServerAuthOpenEnable(final boolean serverAuthOpenEnable) { - this.serverAuthOpenEnable = serverAuthOpenEnable; + public void setPublicExecutorThreads(final int publicExecutorThreads) { + this.publicExecutorThreads = publicExecutorThreads; } } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingServerConfig.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingServerConfig.java new file mode 100644 index 0000000..9879364 --- /dev/null +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingServerConfig.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.config; + +public class RemotingServerConfig extends RemotingConfig { + private int serverListenPort = 8888; + /** + * If server only listened 1 port,recommend to set the value to 1 + */ + private int serverAcceptorThreads = 1; + private int serverIoThreads = 3; + private int serverWorkerThreads = 8; + + private int serverOnewayInvokeSemaphore = 256; + private int serverAsyncInvokeSemaphore = 64; + + private boolean serverNativeEpollEnable = false; + private boolean serverPooledBytebufAllocatorEnable = true; + + public int getServerListenPort() { + return serverListenPort; + } + + public void setServerListenPort(final int serverListenPort) { + this.serverListenPort = serverListenPort; + } + + public int getServerAcceptorThreads() { + return serverAcceptorThreads; + } + + public void setServerAcceptorThreads(final int serverAcceptorThreads) { + this.serverAcceptorThreads = serverAcceptorThreads; + } + + public int getServerIoThreads() { + return serverIoThreads; + } + + public void setServerIoThreads(final int serverIoThreads) { + this.serverIoThreads = serverIoThreads; + } + + public int getServerWorkerThreads() { + return serverWorkerThreads; + } + + public void setServerWorkerThreads(final int serverWorkerThreads) { + this.serverWorkerThreads = serverWorkerThreads; + } + + public int getServerOnewayInvokeSemaphore() { + return serverOnewayInvokeSemaphore; + } + + public void setServerOnewayInvokeSemaphore(final int serverOnewayInvokeSemaphore) { + this.serverOnewayInvokeSemaphore = serverOnewayInvokeSemaphore; + } + + public int getServerAsyncInvokeSemaphore() { + return serverAsyncInvokeSemaphore; + } + + public void setServerAsyncInvokeSemaphore(final int serverAsyncInvokeSemaphore) { + this.serverAsyncInvokeSemaphore = serverAsyncInvokeSemaphore; + } + + public boolean isServerNativeEpollEnable() { + return serverNativeEpollEnable; + } + + public void setServerNativeEpollEnable(final boolean serverNativeEpollEnable) { + this.serverNativeEpollEnable = serverNativeEpollEnable; + } + + public boolean isServerPooledBytebufAllocatorEnable() { + return serverPooledBytebufAllocatorEnable; + } + + public void setServerPooledBytebufAllocatorEnable(final boolean serverPooledBytebufAllocatorEnable) { + this.serverPooledBytebufAllocatorEnable = serverPooledBytebufAllocatorEnable; + } + + @Override + public int getOnewayInvokeSemaphore() { + return this.serverOnewayInvokeSemaphore; + } + + @Override + public int getAsyncInvokeSemaphore() { + return this.serverAsyncInvokeSemaphore; + } +} diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/TcpSocketConfig.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/TcpSocketConfig.java index 4dfcde7..d77bf3d 100755 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/TcpSocketConfig.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/TcpSocketConfig.java @@ -23,14 +23,14 @@ package org.apache.rocketmq.remoting.config; * @see java.net.SocketOptions */ public class TcpSocketConfig { - private boolean tcpSoReuseAddress; - private boolean tcpSoKeepAlive; - private boolean tcpSoNoDelay; - private int tcpSoSndBufSize; // see /proc/sys/net/ipv4/tcp_rmem - private int tcpSoRcvBufSize; // see /proc/sys/net/ipv4/tcp_wmem - private int tcpSoBacklogSize; - private int tcpSoLinger; - private int tcpSoTimeout; + private boolean tcpSoReuseAddress = true; + private boolean tcpSoKeepAlive = false; + private boolean tcpSoNoDelay = true; + private int tcpSoSndBufSize = 65535; // see /proc/sys/net/ipv4/tcp_rmem + private int tcpSoRcvBufSize = 65535; // see /proc/sys/net/ipv4/tcp_wmem + private int tcpSoBacklogSize = 1024; + private int tcpSoLinger = -1; + private int tcpSoTimeoutMillis = 3000; public boolean isTcpSoReuseAddress() { return tcpSoReuseAddress; @@ -88,11 +88,11 @@ public class TcpSocketConfig { this.tcpSoLinger = tcpSoLinger; } - public int getTcpSoTimeout() { - return tcpSoTimeout; + public int getTcpSoTimeoutMillis() { + return tcpSoTimeoutMillis; } - public void setTcpSoTimeout(final int tcpSoTimeout) { - this.tcpSoTimeout = tcpSoTimeout; + public void setTcpSoTimeoutMillis(final int tcpSoTimeoutMillis) { + this.tcpSoTimeoutMillis = tcpSoTimeoutMillis; } } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java index 8d3ff3a..988c20c 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java @@ -29,12 +29,12 @@ public class CodecHelper { // + RemarkLen(2) + PropertiesSize(2) + PayloadLen(4); public final static int MIN_PROTOCOL_LEN = 1 + 4 + 2 + 2 + 4 + 1 + 2 + 2 + 2 + 4; public final static byte PROTOCOL_MAGIC = 0x14; - private final static char PROPERTY_SEPARATOR = '\n'; - private final static Charset REMOTING_CHARSET = Charset.forName("UTF-8"); final static int REMARK_MAX_LEN = Short.MAX_VALUE; final static int PROPERTY_MAX_LEN = 524288; // 512KB final static int PAYLOAD_MAX_LEN = 16777216; // 16MB public final static int PACKET_MAX_LEN = MIN_PROTOCOL_LEN + REMARK_MAX_LEN + PROPERTY_MAX_LEN + PAYLOAD_MAX_LEN; + private final static char PROPERTY_SEPARATOR = '\n'; + private final static Charset REMOTING_CHARSET = Charset.forName("UTF-8"); public static void encodeCommand(final RemotingCommand command, final ByteBufferWrapper out) { out.writeByte(PROTOCOL_MAGIC); diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java index 0b084a0..4f4ec5c 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java @@ -28,7 +28,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import org.apache.rocketmq.remoting.config.RemotingConfig; +import org.apache.rocketmq.remoting.config.RemotingClientConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,10 +41,10 @@ public class ClientChannelManager { final ConcurrentHashMap<String, ChannelWrapper> channelTables = new ConcurrentHashMap<>(); private final Lock lockChannelTables = new ReentrantLock(); private final Bootstrap clientBootstrap; - private final RemotingConfig clientConfig; + private final RemotingClientConfig clientConfig; ClientChannelManager(final Bootstrap bootstrap, - final RemotingConfig config) { + final RemotingClientConfig config) { clientBootstrap = bootstrap; clientConfig = config; } @@ -106,7 +106,7 @@ public class ClientChannelManager { if (cw != null) { ChannelFuture channelFuture = cw.getChannelFuture(); - if (channelFuture.awaitUninterruptibly(this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis())) { + if (channelFuture.awaitUninterruptibly(this.clientConfig.getConnectTimeoutMillis())) { if (cw.isActive()) { LOG.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString()); return cw.getChannel(); @@ -115,7 +115,7 @@ public class ClientChannelManager { this.closeChannel(addr, cw.getChannel()); } } else { - LOG.warn("createChannel: connect remote host[{}] timeout {}ms, {}, and destroy the channel", addr, this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis(), + LOG.warn("createChannel: connect remote host[{}] timeout {}ms, {}, and destroy the channel", addr, this.clientConfig.getConnectTimeoutMillis(), channelFuture.toString()); this.closeChannel(addr, cw.getChannel()); } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEventType.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEventType.java index 1bf2277..432363d 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEventType.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEventType.java @@ -18,8 +18,8 @@ package org.apache.rocketmq.remoting.impl.netty; public enum NettyChannelEventType { - ACTIVE, - INACTIVE, + CONNECT, + CLOSE, IDLE, EXCEPTION } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java index 920a922..cbd0059 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java @@ -74,15 +74,20 @@ public abstract class NettyRemotingAbstract implements RemotingService { private final String remotingInstanceId = UIDGenerator.instance().createUID(); private final ExecutorService publicExecutor; + private final ExecutorService asyncHandlerExecutor; protected ScheduledExecutorService houseKeepingService = ThreadUtils.newSingleThreadScheduledExecutor("HouseKeepingService", true); private InterceptorGroup interceptorGroup = new InterceptorGroup(); private ChannelEventListenerGroup channelEventListenerGroup = new ChannelEventListenerGroup(); - NettyRemotingAbstract(RemotingConfig clientConfig) { - this.semaphoreOneway = new Semaphore(clientConfig.getClientOnewayInvokeSemaphore(), true); - this.semaphoreAsync = new Semaphore(clientConfig.getClientAsyncInvokeSemaphore(), true); + NettyRemotingAbstract(RemotingConfig remotingConfig) { + this.semaphoreOneway = new Semaphore(remotingConfig.getOnewayInvokeSemaphore(), true); + this.semaphoreAsync = new Semaphore(remotingConfig.getAsyncInvokeSemaphore(), true); this.publicExecutor = ThreadUtils.newFixedThreadPool( - clientConfig.getClientAsyncCallbackExecutorThreads(), + remotingConfig.getPublicExecutorThreads(), + 10000, "Remoting-PublicExecutor", true); + + this.asyncHandlerExecutor = ThreadUtils.newFixedThreadPool( + remotingConfig.getAsyncHandlerExecutorThreads(), 10000, "Remoting-PublicExecutor", true); this.remotingCommandFactory = new RemotingCommandFactoryImpl(); } @@ -133,7 +138,9 @@ public abstract class NettyRemotingAbstract implements RemotingService { @Override public void stop() { + ThreadUtils.shutdownGracefully(houseKeepingService, 3000, TimeUnit.MILLISECONDS); ThreadUtils.shutdownGracefully(publicExecutor, 2000, TimeUnit.MILLISECONDS); + ThreadUtils.shutdownGracefully(asyncHandlerExecutor, 2000, TimeUnit.MILLISECONDS); ThreadUtils.shutdownGracefully(channelEventExecutor); } @@ -234,16 +241,12 @@ public abstract class NettyRemotingAbstract implements RemotingService { channel.writeAndFlush(msg); } - public ExecutorService getCallbackExecutor() { - return this.publicExecutor; - } - /** * Execute callback in callback executor. If callback executor is null, run directly in current thread */ private void executeAsyncHandler(final ResponseFuture responseFuture) { boolean runInThisThread = false; - ExecutorService executor = this.getCallbackExecutor(); + ExecutorService executor = asyncHandlerExecutor; if (executor != null) { try { executor.submit(new Runnable() { @@ -549,10 +552,10 @@ public abstract class NettyRemotingAbstract implements RemotingService { case IDLE: listener.onChannelIdle(channel); break; - case INACTIVE: + case CLOSE: listener.onChannelClose(channel); break; - case ACTIVE: + case CONNECT: listener.onChannelConnect(channel); break; case EXCEPTION: @@ -571,7 +574,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { } - protected class EventDispatcher extends SimpleChannelInboundHandler<RemotingCommand> { + protected class RemotingCommandDispatcher extends SimpleChannelInboundHandler<RemotingCommand> { @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java index ce30aa2..6b2796e 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java @@ -37,18 +37,16 @@ import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup; import java.net.SocketAddress; -import java.util.concurrent.TimeUnit; import org.apache.rocketmq.remoting.api.AsyncHandler; import org.apache.rocketmq.remoting.api.RemotingClient; import org.apache.rocketmq.remoting.api.command.RemotingCommand; import org.apache.rocketmq.remoting.api.command.TrafficType; import org.apache.rocketmq.remoting.api.exception.RemoteConnectFailureException; import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException; -import org.apache.rocketmq.remoting.config.RemotingConfig; +import org.apache.rocketmq.remoting.config.RemotingClientConfig; import org.apache.rocketmq.remoting.external.ThreadUtils; import org.apache.rocketmq.remoting.impl.netty.handler.Decoder; import org.apache.rocketmq.remoting.impl.netty.handler.Encoder; -import org.apache.rocketmq.remoting.impl.netty.handler.ExceptionHandler; import org.apache.rocketmq.remoting.internal.JvmUtils; public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient { @@ -56,21 +54,21 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti private final EventLoopGroup ioGroup; private final Class<? extends SocketChannel> socketChannelClass; - private final RemotingConfig clientConfig; + private final RemotingClientConfig clientConfig; private EventExecutorGroup workerGroup; private ClientChannelManager clientChannelManager; - public NettyRemotingClient(final RemotingConfig clientConfig) { + public NettyRemotingClient(final RemotingClientConfig clientConfig) { super(clientConfig); this.clientConfig = clientConfig; if (JvmUtils.isLinux() && this.clientConfig.isClientNativeEpollEnable()) { - this.ioGroup = new EpollEventLoopGroup(clientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads", + this.ioGroup = new EpollEventLoopGroup(clientConfig.getClientIoThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads", clientConfig.getClientWorkerThreads())); socketChannelClass = EpollSocketChannel.class; } else { - this.ioGroup = new NioEventLoopGroup(clientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientNioIoThreads", + this.ioGroup = new NioEventLoopGroup(clientConfig.getClientIoThreads(), ThreadUtils.newGenericThreadFactory("NettyClientNioIoThreads", clientConfig.getClientWorkerThreads())); socketChannelClass = NioSocketChannel.class; } @@ -88,15 +86,14 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti this.clientBootstrap.group(this.ioGroup).channel(socketChannelClass) .handler(new ChannelInitializer<SocketChannel>() { @Override - public void initChannel(SocketChannel ch) throws Exception { + public void initChannel(SocketChannel ch) { ch.pipeline().addLast(workerGroup, new Decoder(), new Encoder(), new IdleStateHandler(clientConfig.getConnectionChannelReaderIdleSeconds(), clientConfig.getConnectionChannelWriterIdleSeconds(), clientConfig.getConnectionChannelIdleSeconds()), new ClientConnectionHandler(), - new EventDispatcher(), - new ExceptionHandler()); + new RemotingCommandDispatcher()); } }); @@ -108,14 +105,10 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti @Override public void stop() { try { - ThreadUtils.shutdownGracefully(houseKeepingService, 3000, TimeUnit.MILLISECONDS); - clientChannelManager.clear(); this.ioGroup.shutdownGracefully(); - ThreadUtils.shutdownGracefully(channelEventExecutor); - this.workerGroup.shutdownGracefully(); } catch (Exception e) { LOG.warn("RemotingClient stopped error !", e); @@ -126,10 +119,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti private void applyOptions(Bootstrap bootstrap) { if (null != clientConfig) { - if (clientConfig.getTcpSoLinger() > 0) { - bootstrap.option(ChannelOption.SO_LINGER, clientConfig.getTcpSoLinger()); - } - if (clientConfig.getTcpSoSndBufSize() > 0) { bootstrap.option(ChannelOption.SO_SNDBUF, clientConfig.getTcpSoSndBufSize()); } @@ -137,10 +126,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti bootstrap.option(ChannelOption.SO_RCVBUF, clientConfig.getTcpSoRcvBufSize()); } - bootstrap.option(ChannelOption.SO_REUSEADDR, clientConfig.isTcpSoReuseAddress()). - option(ChannelOption.SO_KEEPALIVE, clientConfig.isTcpSoKeepAlive()). + bootstrap.option(ChannelOption.SO_KEEPALIVE, clientConfig.isTcpSoKeepAlive()). option(ChannelOption.TCP_NODELAY, clientConfig.isTcpSoNoDelay()). - option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getTcpSoTimeout()). + option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getTcpSoTimeoutMillis()). option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(clientConfig.getWriteBufLowWaterMark(), clientConfig.getWriteBufHighWaterMark())); } @@ -206,7 +194,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti LOG.info("Connected from {} to {}.", localAddress, remoteAddress); super.connect(ctx, remoteAddress, localAddress, promise); - putNettyEvent(new NettyChannelEvent(NettyChannelEventType.ACTIVE, ctx.channel())); + putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CONNECT, ctx.channel())); } @Override @@ -217,7 +205,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti super.disconnect(ctx, promise); - putNettyEvent(new NettyChannelEvent(NettyChannelEventType.INACTIVE, ctx.channel())); + putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CLOSE, ctx.channel())); } @Override @@ -228,11 +216,11 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti super.close(ctx, promise); - putNettyEvent(new NettyChannelEvent(NettyChannelEventType.INACTIVE, ctx.channel())); + putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CLOSE, ctx.channel())); } @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state().equals(IdleState.ALL_IDLE)) { @@ -246,7 +234,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { LOG.info("Close channel {} because of error {} ", ctx.channel(), cause); NettyRemotingClient.this.clientChannelManager.closeChannel(ctx.channel()); putNettyEvent(new NettyChannelEvent(NettyChannelEventType.EXCEPTION, ctx.channel())); diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java index f1e9360..f0dbb45 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java @@ -39,12 +39,11 @@ import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup; import java.net.InetSocketAddress; -import java.util.concurrent.TimeUnit; import org.apache.rocketmq.remoting.api.AsyncHandler; import org.apache.rocketmq.remoting.api.RemotingServer; import org.apache.rocketmq.remoting.api.channel.RemotingChannel; import org.apache.rocketmq.remoting.api.command.RemotingCommand; -import org.apache.rocketmq.remoting.config.RemotingConfig; +import org.apache.rocketmq.remoting.config.RemotingServerConfig; import org.apache.rocketmq.remoting.external.ThreadUtils; import org.apache.rocketmq.remoting.impl.channel.NettyChannelImpl; import org.apache.rocketmq.remoting.impl.netty.handler.Decoder; @@ -52,7 +51,7 @@ import org.apache.rocketmq.remoting.impl.netty.handler.Encoder; import org.apache.rocketmq.remoting.internal.JvmUtils; public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer { - private final RemotingConfig serverConfig; + private final RemotingServerConfig serverConfig; private final ServerBootstrap serverBootstrap; private final EventLoopGroup bossGroup; @@ -62,7 +61,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti private int port; - public NettyRemotingServer(final RemotingConfig serverConfig) { + public NettyRemotingServer(final RemotingServerConfig serverConfig) { super(serverConfig); this.serverBootstrap = new ServerBootstrap(); @@ -107,7 +106,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti serverConfig.getConnectionChannelWriterIdleSeconds(), serverConfig.getConnectionChannelIdleSeconds()), new ServerConnectionHandler(), - new EventDispatcher()); + new RemotingCommandDispatcher()); } }); @@ -122,10 +121,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti @Override public void stop() { try { - ThreadUtils.shutdownGracefully(houseKeepingService, 3000, TimeUnit.MILLISECONDS); - - ThreadUtils.shutdownGracefully(channelEventExecutor); - this.bossGroup.shutdownGracefully().syncUninterruptibly(); this.ioGroup.shutdownGracefully().syncUninterruptibly(); @@ -160,11 +155,11 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti bootstrap.option(ChannelOption.SO_REUSEADDR, serverConfig.isTcpSoReuseAddress()). childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isTcpSoKeepAlive()). childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpSoNoDelay()). - option(ChannelOption.CONNECT_TIMEOUT_MILLIS, serverConfig.getTcpSoTimeout()); - } + option(ChannelOption.CONNECT_TIMEOUT_MILLIS, serverConfig.getTcpSoTimeoutMillis()); - if (serverConfig.isServerPooledBytebufAllocatorEnable()) { - bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + if (serverConfig.isServerPooledBytebufAllocatorEnable()) { + bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + } } } @@ -194,28 +189,32 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti private class ServerConnectionHandler extends ChannelDuplexHandler { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + LOG.info("Channel {} registered, remote address {}.", ctx.channel(), ctx.channel().remoteAddress()); super.channelRegistered(ctx); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + LOG.info("Channel {} unregistered, remote address {}.", ctx.channel(), ctx.channel().remoteAddress()); super.channelUnregistered(ctx); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { + LOG.info("Channel {} became active, remote address {}.", ctx.channel(), ctx.channel().remoteAddress()); super.channelActive(ctx); - putNettyEvent(new NettyChannelEvent(NettyChannelEventType.ACTIVE, ctx.channel())); + putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CONNECT, ctx.channel())); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { + LOG.info("Channel {} became inactive, remote address {}.", ctx.channel(), ctx.channel().remoteAddress()); super.channelInactive(ctx); - putNettyEvent(new NettyChannelEvent(NettyChannelEventType.INACTIVE, ctx.channel())); + putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CLOSE, ctx.channel())); } @Override - public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception { + public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) { if (evt instanceof IdleStateEvent) { final IdleStateEvent event = (IdleStateEvent) evt; if (event.state().equals(IdleState.ALL_IDLE)) { @@ -233,9 +232,9 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } @Override - public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception { + public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { + LOG.info("Close channel {} because of error {} ", ctx.channel(), cause); putNettyEvent(new NettyChannelEvent(NettyChannelEventType.EXCEPTION, ctx.channel(), cause)); - ctx.channel().close().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java index aab6dfb..ed7c93a 100644 --- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java +++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java @@ -43,7 +43,8 @@ public class BaseTest { ThreadUtils.shutdownGracefully(executor, 5, TimeUnit.SECONDS); } - protected void runInThreads(final Runnable runnable, int threadsNum, int timeoutMillis) throws InterruptedException { + protected void runInThreads(final Runnable runnable, int threadsNum, + int timeoutMillis) throws InterruptedException { final Semaphore semaphore = new Semaphore(0); runInThreads(new Runnable() { diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnceTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnceTest.java index b671662..56440e5 100644 --- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnceTest.java +++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnceTest.java @@ -21,7 +21,7 @@ import java.util.concurrent.Semaphore; import org.apache.rocketmq.remoting.BaseTest; import org.junit.Test; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; public class SemaphoreReleaseOnlyOnceTest extends BaseTest { diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImplTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImplTest.java index c1274c6..391c8bd 100644 --- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImplTest.java +++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImplTest.java @@ -22,7 +22,10 @@ import org.apache.rocketmq.remoting.api.command.RemotingCommandFactory; import org.apache.rocketmq.remoting.api.command.TrafficType; import org.junit.Test; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class RemotingCommandFactoryImplTest { private RemotingCommandFactory factory = new RemotingCommandFactoryImpl(); diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RequestIdGeneratorTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RequestIdGeneratorTest.java index 5686124..4620542 100644 --- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RequestIdGeneratorTest.java +++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RequestIdGeneratorTest.java @@ -19,7 +19,7 @@ package org.apache.rocketmq.remoting.impl.command; import org.junit.Test; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; public class RequestIdGeneratorTest { diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManagerTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManagerTest.java index 16084d8..16618e8 100644 --- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManagerTest.java +++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManagerTest.java @@ -26,7 +26,7 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.remoting.BaseTest; -import org.apache.rocketmq.remoting.config.RemotingConfig; +import org.apache.rocketmq.remoting.config.RemotingClientConfig; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -60,7 +60,7 @@ public class ClientChannelManagerTest extends BaseTest { when(channel.close()).thenReturn(channelPromise); when(channel.remoteAddress()).thenReturn(new InetSocketAddress(8080)); - channelManager = new ClientChannelManager(clientBootstrap, new RemotingConfig()); + channelManager = new ClientChannelManager(clientBootstrap, new RemotingClientConfig()); } @Test diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/EncoderTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/EncoderTest.java index 9a379e9..1629342 100644 --- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/EncoderTest.java +++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/EncoderTest.java @@ -52,7 +52,6 @@ public class EncoderTest extends BaseTest { assertEquals(request, decodedRequest); } - @Test public void encode_LenOverLimit_ChannelClosed() { EmbeddedChannel channel = new EmbeddedChannel(new Encoder());
