This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-remoting.git

commit 17c5dea02f772ab813ae304ee01a8fe41a5aea53
Author: yukon <[email protected]>
AuthorDate: Thu May 16 15:22:24 2019 +0800

    Polish netty implementation
---
 .../rocketmq/remoting/common/ResponseResult.java   |  2 +-
 .../remoting/impl/netty/NettyRemotingAbstract.java | 23 ++++----------
 .../remoting/impl/netty/NettyRemotingClient.java   | 35 +++++++++++-----------
 .../remoting/impl/netty/NettyRemotingServer.java   |  3 +-
 4 files changed, 26 insertions(+), 37 deletions(-)

diff --git 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseResult.java
 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseResult.java
index 2557cdf..92f501f 100644
--- 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseResult.java
+++ 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseResult.java
@@ -91,7 +91,7 @@ public class ResponseResult {
             try {
                 interceptorGroup.onException(new 
ExceptionContext(RemotingEndPoint.REQUEST, this.remoteAddr, this.requestCommand,
                     null, "CALLBACK_TIMEOUT"));
-            } catch (Throwable e) {
+            } catch (Throwable ignore) {
             }
             if (null != asyncHandler) {
                 asyncHandler.onTimeout(costTimeMillis, timoutMillis);
diff --git 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
index b351445..cbb211e 100644
--- 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
+++ 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
@@ -24,6 +24,7 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -32,7 +33,6 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.remoting.api.AsyncHandler;
 import org.apache.rocketmq.remoting.api.RemotingEndPoint;
 import org.apache.rocketmq.remoting.api.RemotingService;
@@ -42,6 +42,7 @@ import 
org.apache.rocketmq.remoting.api.channel.RemotingChannel;
 import org.apache.rocketmq.remoting.api.command.RemotingCommand;
 import org.apache.rocketmq.remoting.api.command.RemotingCommandFactory;
 import org.apache.rocketmq.remoting.api.command.TrafficType;
+import org.apache.rocketmq.remoting.api.exception.RemoteAccessException;
 import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException;
 import org.apache.rocketmq.remoting.api.interceptor.ExceptionContext;
 import org.apache.rocketmq.remoting.api.interceptor.Interceptor;
@@ -69,7 +70,6 @@ public abstract class NettyRemotingAbstract implements 
RemotingService {
     private final Semaphore semaphoreAsync;
     private final Map<Integer, ResponseResult> ackTables = new 
ConcurrentHashMap<Integer, ResponseResult>(256);
     private final Map<Short, Pair<RequestProcessor, ExecutorService>> 
processorTables = new ConcurrentHashMap<>();
-    private final AtomicLong responseCounter = new AtomicLong(0);
     private final RemotingCommandFactory remotingCommandFactory;
     private final String remotingInstanceId = 
UIDGenerator.instance().createUID();
 
@@ -101,7 +101,6 @@ public abstract class NettyRemotingAbstract implements 
RemotingService {
     }
 
     void scanResponseTable() {
-        /*
         Iterator<Map.Entry<Integer, ResponseResult>> iterator = 
this.ackTables.entrySet().iterator();
         while (iterator.hasNext()) {
             Map.Entry<Integer, ResponseResult> next = iterator.next();
@@ -113,7 +112,6 @@ public abstract class NettyRemotingAbstract implements 
RemotingService {
                     long timeoutMillis = result.getTimeoutMillis();
                     long costTimeMillis = System.currentTimeMillis() - 
result.getBeginTimestamp();
                     result.onTimeout(timeoutMillis, costTimeMillis);
-                    LOG.error("scan response table command {} failed", 
result.getRequestId());
                 } catch (Throwable e) {
                     LOG.warn("Error occurred when execute timeout callback !", 
e);
                 } finally {
@@ -122,7 +120,6 @@ public abstract class NettyRemotingAbstract implements 
RemotingService {
                 }
             }
         }
-        */
     }
 
     @Override
@@ -166,10 +163,8 @@ public abstract class NettyRemotingAbstract implements 
RemotingService {
         try {
             processorExecutorPair.getRight().submit(run);
         } catch (RejectedExecutionException e) {
-            if ((System.currentTimeMillis() % 10000) == 0) {
-                LOG.warn(String.format("Request %s from %s Rejected by server 
executor %s !", cmd,
-                    extractRemoteAddress(ctx.channel()), 
processorExecutorPair.getRight().toString()));
-            }
+            LOG.warn(String.format("Request %s from %s Rejected by server 
executor %s !", cmd,
+                extractRemoteAddress(ctx.channel()), 
processorExecutorPair.getRight().toString()));
 
             if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) {
                 interceptorGroup.onException(new 
ExceptionContext(RemotingEndPoint.RESPONSE,
@@ -189,12 +184,8 @@ public abstract class NettyRemotingAbstract implements 
RemotingService {
             responseResult.setResponseCommand(cmd);
             responseResult.release();
 
-            long time = System.currentTimeMillis();
             ackTables.remove(cmd.requestID());
-            if (responseCounter.incrementAndGet() % 5000 == 0) {
-                LOG.info("REQUEST ID:{}, cost time:{}, ackTables.size:{}", 
cmd.requestID(), time - responseResult.getBeginTimestamp(),
-                    ackTables.size());
-            }
+
             if (responseResult.getAsyncHandler() != null) {
                 boolean sameThread = false;
                 ExecutorService executor = this.getCallbackExecutor();
@@ -346,10 +337,9 @@ public abstract class NettyRemotingAbstract implements 
RemotingService {
                 if (responseResult.isSendRequestOK()) {
                     throw new 
RemoteTimeoutException(extractRemoteAddress(channel), timeoutMillis, 
responseResult.getCause());
                 }
-                /*
                 else {
                     throw new 
RemoteAccessException(extractRemoteAddress(channel), responseResult.getCause());
-                }*/
+                }
             }
 
             return responseCommand;
@@ -546,7 +536,6 @@ public abstract class NettyRemotingAbstract implements 
RemotingService {
             super(nettyEventExector);
             this.name = nettyEventExector;
         }
-        //private final AtomicBoolean isStopped = new AtomicBoolean(true);
 
         public void putNettyEvent(final NettyChannelEvent event) {
             if (this.eventQueue.size() <= MAX_SIZE) {
diff --git 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
index 3dab3db..b9f9a64 100644
--- 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
+++ 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
@@ -96,9 +96,14 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
-                    ch.pipeline().addLast(workerGroup, new Decoder(), new 
Encoder(), new 
IdleStateHandler(clientConfig.getConnectionChannelReaderIdleSeconds(),
+                    ch.pipeline().addLast(workerGroup,
+                        new Decoder(),
+                        new Encoder(),
+                        new 
IdleStateHandler(clientConfig.getConnectionChannelReaderIdleSeconds(),
                             
clientConfig.getConnectionChannelWriterIdleSeconds(), 
clientConfig.getConnectionChannelIdleSeconds()),
-                        new ClientConnectionHandler(), new EventDispatcher(), 
new ExceptionHandler());
+                        new ClientConnectionHandler(),
+                        new EventDispatcher(),
+                        new ExceptionHandler());
                 }
             });
 
@@ -131,25 +136,23 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
 
     @Override
     public void stop() {
-        // try {
-        ThreadUtils.shutdownGracefully(houseKeepingService, 3000, 
TimeUnit.MILLISECONDS);
+        try {
+            ThreadUtils.shutdownGracefully(houseKeepingService, 3000, 
TimeUnit.MILLISECONDS);
 
-        for (ChannelWrapper cw : this.channelTables.values()) {
-            this.closeChannel(null, cw.getChannel());
-        }
+            for (ChannelWrapper cw : this.channelTables.values()) {
+                this.closeChannel(null, cw.getChannel());
+            }
 
-        this.channelTables.clear();
+            this.channelTables.clear();
 
-        this.ioGroup.shutdownGracefully();
+            this.ioGroup.shutdownGracefully();
 
-        ThreadUtils.shutdownGracefully(channelEventExecutor);
+            ThreadUtils.shutdownGracefully(channelEventExecutor);
 
-        this.workerGroup.shutdownGracefully();
-        /*
+            this.workerGroup.shutdownGracefully();
         } catch (Exception e) {
-            LOG.error("RemotingClient stopped error !", e);
+            LOG.warn("RemotingClient stopped error !", e);
         }
-        */
 
         super.stop();
     }
@@ -263,11 +266,9 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
                 LOG.warn("invoke: wait response timeout<{}ms> exception, so 
close the channel[{}]", timeoutMillis, address);
                 throw e;
             } finally {
-                /*
                 if (this.clientConfig.isClientShortConnectionEnable()) {
-                    this.closeChannel(addr, channel);
+                    this.closeChannel(address, channel);
                 }
-                */
             }
         } else {
             this.closeChannel(address, channel);
diff --git 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
index ec8a243..60aca5e 100644
--- 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
+++ 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
@@ -160,7 +160,6 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
     @Override
     public void stop() {
         try {
-
             ThreadUtils.shutdownGracefully(houseKeepingService, 3000, 
TimeUnit.MILLISECONDS);
 
             ThreadUtils.shutdownGracefully(channelEventExecutor);
@@ -171,7 +170,7 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
 
             this.workerGroup.shutdownGracefully().syncUninterruptibly();
         } catch (Exception e) {
-            LOG.error("RemotingServer stopped error !", e);
+            LOG.warn("RemotingServer stopped error !", e);
         }
 
         super.stop();

Reply via email to