This is an automated email from the ASF dual-hosted git repository. yukon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-remoting.git
commit 17c5dea02f772ab813ae304ee01a8fe41a5aea53 Author: yukon <[email protected]> AuthorDate: Thu May 16 15:22:24 2019 +0800 Polish netty implementation --- .../rocketmq/remoting/common/ResponseResult.java | 2 +- .../remoting/impl/netty/NettyRemotingAbstract.java | 23 ++++---------- .../remoting/impl/netty/NettyRemotingClient.java | 35 +++++++++++----------- .../remoting/impl/netty/NettyRemotingServer.java | 3 +- 4 files changed, 26 insertions(+), 37 deletions(-) diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseResult.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseResult.java index 2557cdf..92f501f 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseResult.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseResult.java @@ -91,7 +91,7 @@ public class ResponseResult { try { interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.REQUEST, this.remoteAddr, this.requestCommand, null, "CALLBACK_TIMEOUT")); - } catch (Throwable e) { + } catch (Throwable ignore) { } if (null != asyncHandler) { asyncHandler.onTimeout(costTimeMillis, timoutMillis); diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java index b351445..cbb211e 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java @@ -24,6 +24,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -32,7 +33,6 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.remoting.api.AsyncHandler; import org.apache.rocketmq.remoting.api.RemotingEndPoint; import org.apache.rocketmq.remoting.api.RemotingService; @@ -42,6 +42,7 @@ import org.apache.rocketmq.remoting.api.channel.RemotingChannel; import org.apache.rocketmq.remoting.api.command.RemotingCommand; import org.apache.rocketmq.remoting.api.command.RemotingCommandFactory; import org.apache.rocketmq.remoting.api.command.TrafficType; +import org.apache.rocketmq.remoting.api.exception.RemoteAccessException; import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException; import org.apache.rocketmq.remoting.api.interceptor.ExceptionContext; import org.apache.rocketmq.remoting.api.interceptor.Interceptor; @@ -69,7 +70,6 @@ public abstract class NettyRemotingAbstract implements RemotingService { private final Semaphore semaphoreAsync; private final Map<Integer, ResponseResult> ackTables = new ConcurrentHashMap<Integer, ResponseResult>(256); private final Map<Short, Pair<RequestProcessor, ExecutorService>> processorTables = new ConcurrentHashMap<>(); - private final AtomicLong responseCounter = new AtomicLong(0); private final RemotingCommandFactory remotingCommandFactory; private final String remotingInstanceId = UIDGenerator.instance().createUID(); @@ -101,7 +101,6 @@ public abstract class NettyRemotingAbstract implements RemotingService { } void scanResponseTable() { - /* Iterator<Map.Entry<Integer, ResponseResult>> iterator = this.ackTables.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry<Integer, ResponseResult> next = iterator.next(); @@ -113,7 +112,6 @@ public abstract class NettyRemotingAbstract implements RemotingService { long timeoutMillis = result.getTimeoutMillis(); long costTimeMillis = System.currentTimeMillis() - result.getBeginTimestamp(); result.onTimeout(timeoutMillis, costTimeMillis); - LOG.error("scan response table command {} failed", result.getRequestId()); } catch (Throwable e) { LOG.warn("Error occurred when execute timeout callback !", e); } finally { @@ -122,7 +120,6 @@ public abstract class NettyRemotingAbstract implements RemotingService { } } } - */ } @Override @@ -166,10 +163,8 @@ public abstract class NettyRemotingAbstract implements RemotingService { try { processorExecutorPair.getRight().submit(run); } catch (RejectedExecutionException e) { - if ((System.currentTimeMillis() % 10000) == 0) { - LOG.warn(String.format("Request %s from %s Rejected by server executor %s !", cmd, - extractRemoteAddress(ctx.channel()), processorExecutorPair.getRight().toString())); - } + LOG.warn(String.format("Request %s from %s Rejected by server executor %s !", cmd, + extractRemoteAddress(ctx.channel()), processorExecutorPair.getRight().toString())); if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) { interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.RESPONSE, @@ -189,12 +184,8 @@ public abstract class NettyRemotingAbstract implements RemotingService { responseResult.setResponseCommand(cmd); responseResult.release(); - long time = System.currentTimeMillis(); ackTables.remove(cmd.requestID()); - if (responseCounter.incrementAndGet() % 5000 == 0) { - LOG.info("REQUEST ID:{}, cost time:{}, ackTables.size:{}", cmd.requestID(), time - responseResult.getBeginTimestamp(), - ackTables.size()); - } + if (responseResult.getAsyncHandler() != null) { boolean sameThread = false; ExecutorService executor = this.getCallbackExecutor(); @@ -346,10 +337,9 @@ public abstract class NettyRemotingAbstract implements RemotingService { if (responseResult.isSendRequestOK()) { throw new RemoteTimeoutException(extractRemoteAddress(channel), timeoutMillis, responseResult.getCause()); } - /* else { throw new RemoteAccessException(extractRemoteAddress(channel), responseResult.getCause()); - }*/ + } } return responseCommand; @@ -546,7 +536,6 @@ public abstract class NettyRemotingAbstract implements RemotingService { super(nettyEventExector); this.name = nettyEventExector; } - //private final AtomicBoolean isStopped = new AtomicBoolean(true); public void putNettyEvent(final NettyChannelEvent event) { if (this.eventQueue.size() <= MAX_SIZE) { diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java index 3dab3db..b9f9a64 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java @@ -96,9 +96,14 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast(workerGroup, new Decoder(), new Encoder(), new IdleStateHandler(clientConfig.getConnectionChannelReaderIdleSeconds(), + ch.pipeline().addLast(workerGroup, + new Decoder(), + new Encoder(), + new IdleStateHandler(clientConfig.getConnectionChannelReaderIdleSeconds(), clientConfig.getConnectionChannelWriterIdleSeconds(), clientConfig.getConnectionChannelIdleSeconds()), - new ClientConnectionHandler(), new EventDispatcher(), new ExceptionHandler()); + new ClientConnectionHandler(), + new EventDispatcher(), + new ExceptionHandler()); } }); @@ -131,25 +136,23 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti @Override public void stop() { - // try { - ThreadUtils.shutdownGracefully(houseKeepingService, 3000, TimeUnit.MILLISECONDS); + try { + ThreadUtils.shutdownGracefully(houseKeepingService, 3000, TimeUnit.MILLISECONDS); - for (ChannelWrapper cw : this.channelTables.values()) { - this.closeChannel(null, cw.getChannel()); - } + for (ChannelWrapper cw : this.channelTables.values()) { + this.closeChannel(null, cw.getChannel()); + } - this.channelTables.clear(); + this.channelTables.clear(); - this.ioGroup.shutdownGracefully(); + this.ioGroup.shutdownGracefully(); - ThreadUtils.shutdownGracefully(channelEventExecutor); + ThreadUtils.shutdownGracefully(channelEventExecutor); - this.workerGroup.shutdownGracefully(); - /* + this.workerGroup.shutdownGracefully(); } catch (Exception e) { - LOG.error("RemotingClient stopped error !", e); + LOG.warn("RemotingClient stopped error !", e); } - */ super.stop(); } @@ -263,11 +266,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti LOG.warn("invoke: wait response timeout<{}ms> exception, so close the channel[{}]", timeoutMillis, address); throw e; } finally { - /* if (this.clientConfig.isClientShortConnectionEnable()) { - this.closeChannel(addr, channel); + this.closeChannel(address, channel); } - */ } } else { this.closeChannel(address, channel); diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java index ec8a243..60aca5e 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java @@ -160,7 +160,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti @Override public void stop() { try { - ThreadUtils.shutdownGracefully(houseKeepingService, 3000, TimeUnit.MILLISECONDS); ThreadUtils.shutdownGracefully(channelEventExecutor); @@ -171,7 +170,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti this.workerGroup.shutdownGracefully().syncUninterruptibly(); } catch (Exception e) { - LOG.error("RemotingServer stopped error !", e); + LOG.warn("RemotingServer stopped error !", e); } super.stop();
