This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 8357cff10 [ISSUE #5002]Fix code style in any net modules. (#4989)
8357cff10 is described below

commit 8357cff10156860ffbedc276d79f90609aa3bb85
Author: CoedPig <[email protected]>
AuthorDate: Mon Sep 5 13:38:26 2022 +0800

    [ISSUE #5002]Fix code style in any net modules. (#4989)
    
    * optimize the memory usage in DefaultMappedFile:
    use AtomicIntegerFieldUpdater instead of AtomicInteger
    
    * fix code bug:
    if the initial cursor of listIterator equals zero,
    the previous element will always null.
    
    * modify constant name
    
    * fix excessive nesting
    
    * Update 
client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
    
    * update maven-surefire-plugin.version from 2.19.1 to 2.22.1
    
    * Revert "update maven-surefire-plugin.version from 2.19.1 to 2.22.1"
    
    This reverts commit 37fabe6e7805826895e224b0da22b57750c7e643.
    
    * Code Style: fix warn info from the code detector.
    
    * fix checkstyle: not use '*' for import.
    
    * Fix code style in any net modules.
    
    * change method name from topicRouteDataIsChange to topicRouteDataChanged 
in TopicRouteData.java
    
    * revert executeInvokeCallback method
    
    * Fix grammar issues
    
    Co-authored-by: shanpengyu <[email protected]>
    Co-authored-by: Aaron Ai <[email protected]>
    Co-authored-by: Zhanhui Li <[email protected]>
---
 .../broker/topic/TopicRouteInfoManager.java        |   2 +-
 .../client/impl/factory/MQClientInstance.java      |   2 +-
 .../rocketmq/common/protocol/route/BrokerData.java |  25 +-
 .../rocketmq/common/protocol/route/QueueData.java  |   8 +-
 .../common/protocol/route/TopicRouteData.java      |   2 +-
 .../apache/rocketmq/common/rpc/ClientMetadata.java |   2 +-
 .../apache/rocketmq/namesrv/NamesrvController.java | 169 ++++---
 .../apache/rocketmq/namesrv/NamesrvStartup.java    |  26 +-
 .../namesrv/routeinfo/RouteInfoManager.java        | 509 ++++++++++-----------
 .../remoting/netty/NettyRemotingAbstract.java      | 256 +++++------
 .../remoting/netty/NettyRemotingServer.java        | 220 ++++-----
 11 files changed, 581 insertions(+), 640 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
index 0f891eded..4a51c7dc2 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
@@ -134,7 +134,7 @@ public class TopicRouteInfoManager {
 
     private boolean updateTopicRouteTable(String topic, TopicRouteData 
topicRouteData) {
         TopicRouteData old = this.topicRouteTable.get(topic);
-        boolean changed = topicRouteData.topicRouteDataIsChange(old);
+        boolean changed = topicRouteData.topicRouteDataChanged(old);
         if (!changed) {
             if (!this.isNeedUpdateTopicRouteInfo(topic)) {
                 return false;
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index e88b70b74..820faf2f2 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -611,7 +611,7 @@ public class MQClientInstance {
                     }
                     if (topicRouteData != null) {
                         TopicRouteData old = this.topicRouteTable.get(topic);
-                        boolean changed = 
topicRouteData.topicRouteDataIsChange(old);
+                        boolean changed = 
topicRouteData.topicRouteDataChanged(old);
                         if (!changed) {
                             changed = this.isNeedUpdateTopicRouteInfo(topic);
                         } else {
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
index f13491350..47c53f8c3 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
@@ -20,16 +20,24 @@ package org.apache.rocketmq.common.protocol.route;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.MixAll;
 
+/**
+ * The class describes that a typical broker cluster's (in replication) 
details: the cluster (in sharding) name
+ * that it belongs to, and all the single instance information for this 
cluster.
+ */
 public class BrokerData implements Comparable<BrokerData> {
     private String cluster;
     private String brokerName;
-    private HashMap<Long/* brokerId */, String/* broker address */> 
brokerAddrs;
+
+    /**
+     * The container that store the all single instances for the current 
broker replication cluster.
+     * The key is the brokerId, and the value is the address of the single 
broker instance.
+     */
+    private HashMap<Long, String> brokerAddrs;
     private String zoneName;
     private final Random random = new Random();
 
@@ -46,10 +54,7 @@ public class BrokerData implements Comparable<BrokerData> {
         this.cluster = brokerData.cluster;
         this.brokerName = brokerData.brokerName;
         if (brokerData.brokerAddrs != null) {
-            this.brokerAddrs = new HashMap<Long, String>();
-            for (final Map.Entry<Long, String> brokerEntry : 
brokerData.brokerAddrs.entrySet()) {
-                this.brokerAddrs.put(brokerEntry.getKey(), 
brokerEntry.getValue());
-            }
+            this.brokerAddrs = new HashMap<>(brokerData.brokerAddrs);
         }
         this.enableActingMaster = brokerData.enableActingMaster;
     }
@@ -82,14 +87,14 @@ public class BrokerData implements Comparable<BrokerData> {
      * @return Broker address.
      */
     public String selectBrokerAddr() {
-        String addr = this.brokerAddrs.get(MixAll.MASTER_ID);
+        String masterAddress = this.brokerAddrs.get(MixAll.MASTER_ID);
 
-        if (addr == null) {
-            List<String> addrs = new ArrayList<String>(brokerAddrs.values());
+        if (masterAddress == null) {
+            List<String> addrs = new ArrayList<>(brokerAddrs.values());
             return addrs.get(random.nextInt(addrs.size()));
         }
 
-        return addr;
+        return masterAddress;
     }
 
     public HashMap<Long, String> getBrokerAddrs() {
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/route/QueueData.java 
b/common/src/main/java/org/apache/rocketmq/common/protocol/route/QueueData.java
index 6e9e653f1..fb55e22de 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/route/QueueData.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/route/QueueData.java
@@ -15,8 +15,8 @@
  * limitations under the License.
  */
 
-/**
- * $Id: QueueData.java 1835 2013-05-16 02:00:50Z [email protected] $
+/*
+  $Id: QueueData.java 1835 2013-05-16 02:00:50Z [email protected] $
  */
 package org.apache.rocketmq.common.protocol.route;
 
@@ -104,9 +104,7 @@ public class QueueData implements Comparable<QueueData> {
             return false;
         if (writeQueueNums != other.writeQueueNums)
             return false;
-        if (topicSysFlag != other.topicSysFlag)
-            return false;
-        return true;
+        return topicSysFlag == other.topicSysFlag;
     }
 
     @Override
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
index 6d8e375dd..fa382296b 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
@@ -118,7 +118,7 @@ public class TopicRouteData extends RemotingSerializable {
         return topicRouteData;
     }
 
-    public boolean topicRouteDataIsChange(TopicRouteData oldData) {
+    public boolean topicRouteDataChanged(TopicRouteData oldData) {
         if (oldData == null)
             return true;
         TopicRouteData old = new TopicRouteData(oldData);
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java 
b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
index 9c8bbf7da..67ceed520 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
@@ -52,7 +52,7 @@ public class ClientMetadata {
             return;
         }
         TopicRouteData old = this.topicRouteTable.get(topic);
-        if (!topicRouteData.topicRouteDataIsChange(old)) {
+        if (!topicRouteData.topicRouteDataChanged(old)) {
             return ;
         }
         {
diff --git 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
index 04abae285..cec567a5c 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
@@ -63,22 +63,18 @@ public class NamesrvController {
     private final NettyClientConfig nettyClientConfig;
 
     private final ScheduledExecutorService scheduledExecutorService = new 
ScheduledThreadPoolExecutor(1,
-        new BasicThreadFactory.Builder()
-            .namingPattern("NSScheduledThread")
-            .daemon(true)
-            .build());
+            new 
BasicThreadFactory.Builder().namingPattern("NSScheduledThread").daemon(true).build());
+
     private final ScheduledExecutorService scanExecutorService = new 
ScheduledThreadPoolExecutor(1,
-        new BasicThreadFactory.Builder()
-            .namingPattern("NSScanScheduledThread")
-            .daemon(true)
-            .build());
+            new 
BasicThreadFactory.Builder().namingPattern("NSScanScheduledThread").daemon(true).build());
+
     private final KVConfigManager kvConfigManager;
     private final RouteInfoManager routeInfoManager;
 
     private RemotingClient remotingClient;
     private RemotingServer remotingServer;
 
-    private BrokerHousekeepingService brokerHousekeepingService;
+    private final BrokerHousekeepingService brokerHousekeepingService;
 
     private ExecutorService defaultExecutor;
     private ExecutorService clientRequestExecutor;
@@ -86,7 +82,7 @@ public class NamesrvController {
     private BlockingQueue<Runnable> defaultThreadPoolQueue;
     private BlockingQueue<Runnable> clientRequestThreadPoolQueue;
 
-    private Configuration configuration;
+    private final Configuration configuration;
     private FileWatchService fileWatchService;
 
     public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig 
nettyServerConfig) {
@@ -100,113 +96,103 @@ public class NamesrvController {
         this.kvConfigManager = new KVConfigManager(this);
         this.brokerHousekeepingService = new BrokerHousekeepingService(this);
         this.routeInfoManager = new RouteInfoManager(namesrvConfig, this);
-        this.configuration = new Configuration(
-            LOGGER,
-            this.namesrvConfig, this.nettyServerConfig
-        );
+        this.configuration = new Configuration(LOGGER, this.namesrvConfig, 
this.nettyServerConfig);
         this.configuration.setStorePathFromConfig(this.namesrvConfig, 
"configStorePath");
     }
 
     public boolean initialize() {
+        loadConfig();
+        initiateNetworkComponents();
+        initiateThreadExecutors();
+        registerProcessor();
+        startScheduleService();
+        initiateSslContext();
+        initiateRpcHooks();
+        return true;
+    }
 
+    private void loadConfig() {
         this.kvConfigManager.load();
+    }
+
+    private void startScheduleService() {
+        
this.scanExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker,
+            5, this.namesrvConfig.getScanNotActiveBrokerInterval(), 
TimeUnit.MILLISECONDS);
+
+        
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically,
+            1, 10, TimeUnit.MINUTES);
 
+        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
+            try {
+                NamesrvController.this.printWaterMark();
+            } catch (Throwable e) {
+                LOGGER.error("printWaterMark error.", e);
+            }
+        }, 10, 1, TimeUnit.SECONDS);
+    }
+
+    private void initiateNetworkComponents() {
         this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, 
this.brokerHousekeepingService);
+        this.remotingClient = new NettyRemotingClient(this.nettyClientConfig);
+    }
 
+    private void initiateThreadExecutors() {
         this.defaultThreadPoolQueue = new 
LinkedBlockingQueue<>(this.namesrvConfig.getDefaultThreadPoolQueueCapacity());
-        this.defaultExecutor = new ThreadPoolExecutor(
-            this.namesrvConfig.getDefaultThreadPoolNums(),
-            this.namesrvConfig.getDefaultThreadPoolNums(),
-            1000 * 60,
-            TimeUnit.MILLISECONDS,
-            this.defaultThreadPoolQueue,
-            new ThreadFactoryImpl("RemotingExecutorThread_")) {
+        this.defaultExecutor = new 
ThreadPoolExecutor(this.namesrvConfig.getDefaultThreadPoolNums(), 
this.namesrvConfig.getDefaultThreadPoolNums(), 1000 * 60, 
TimeUnit.MILLISECONDS, this.defaultThreadPoolQueue, new 
ThreadFactoryImpl("RemotingExecutorThread_")) {
             @Override
             protected <T> RunnableFuture<T> newTaskFor(final Runnable 
runnable, final T value) {
-                return new FutureTaskExt<T>(runnable, value);
+                return new FutureTaskExt<>(runnable, value);
             }
         };
 
         this.clientRequestThreadPoolQueue = new 
LinkedBlockingQueue<>(this.namesrvConfig.getClientRequestThreadPoolQueueCapacity());
-        this.clientRequestExecutor = new ThreadPoolExecutor(
-            this.namesrvConfig.getClientRequestThreadPoolNums(),
-            this.namesrvConfig.getClientRequestThreadPoolNums(),
-            1000 * 60,
-            TimeUnit.MILLISECONDS,
-            this.clientRequestThreadPoolQueue,
-            new ThreadFactoryImpl("ClientRequestExecutorThread_")) {
+        this.clientRequestExecutor = new 
ThreadPoolExecutor(this.namesrvConfig.getClientRequestThreadPoolNums(), 
this.namesrvConfig.getClientRequestThreadPoolNums(), 1000 * 60, 
TimeUnit.MILLISECONDS, this.clientRequestThreadPoolQueue, new 
ThreadFactoryImpl("ClientRequestExecutorThread_")) {
             @Override
             protected <T> RunnableFuture<T> newTaskFor(final Runnable 
runnable, final T value) {
-                return new FutureTaskExt<T>(runnable, value);
+                return new FutureTaskExt<>(runnable, value);
             }
         };
-        this.remotingClient = new NettyRemotingClient(this.nettyClientConfig);
+    }
 
-        this.registerProcessor();
+    private void initiateSslContext() {
+        if (TlsSystemConfig.tlsMode == TlsMode.DISABLED) {
+            return;
+        }
 
-        
this.scanExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker,
-            5, this.namesrvConfig.getScanNotActiveBrokerInterval(), 
TimeUnit.MILLISECONDS);
+        String[] watchFiles = {TlsSystemConfig.tlsServerCertPath, 
TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath};
 
-        
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically,
-            1, 10, TimeUnit.MINUTES);
+        FileWatchService.Listener listener = new FileWatchService.Listener() {
+            boolean certChanged, keyChanged = false;
 
-        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
-            try {
-                NamesrvController.this.printWaterMark();
-            } catch (Throwable e) {
-                LOGGER.error("printWaterMark error.", e);
+            @Override
+            public void onChanged(String path) {
+                if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
+                    LOGGER.info("The trust certificate changed, reload the ssl 
context");
+                    ((NettyRemotingServer) remotingServer).loadSslContext();
+                }
+                if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
+                    certChanged = true;
+                }
+                if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
+                    keyChanged = true;
+                }
+                if (certChanged && keyChanged) {
+                    LOGGER.info("The certificate and private key changed, 
reload the ssl context");
+                    certChanged = keyChanged = false;
+                    ((NettyRemotingServer) remotingServer).loadSslContext();
+                }
             }
-        }, 10, 1, TimeUnit.SECONDS);
+        };
 
-        if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
-            // Register a listener to reload SslContext
-            try {
-                fileWatchService = new FileWatchService(
-                    new String[] {
-                        TlsSystemConfig.tlsServerCertPath,
-                        TlsSystemConfig.tlsServerKeyPath,
-                        TlsSystemConfig.tlsServerTrustCertPath
-                    },
-                    new FileWatchService.Listener() {
-                        boolean certChanged, keyChanged = false;
-
-                        @Override
-                        public void onChanged(String path) {
-                            if 
(path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
-                                LOGGER.info("The trust certificate changed, 
reload the ssl context");
-                                reloadServerSslContext();
-                            }
-                            if 
(path.equals(TlsSystemConfig.tlsServerCertPath)) {
-                                certChanged = true;
-                            }
-                            if (path.equals(TlsSystemConfig.tlsServerKeyPath)) 
{
-                                keyChanged = true;
-                            }
-                            if (certChanged && keyChanged) {
-                                LOGGER.info("The certificate and private key 
changed, reload the ssl context");
-                                certChanged = keyChanged = false;
-                                reloadServerSslContext();
-                            }
-                        }
-
-                        private void reloadServerSslContext() {
-                            ((NettyRemotingServer) 
remotingServer).loadSslContext();
-                        }
-                    });
-            } catch (Exception e) {
-                LOGGER.warn("FileWatchService created error, can't load the 
certificate dynamically");
-            }
+        try {
+            fileWatchService = new FileWatchService(watchFiles, listener);
+        } catch (Exception e) {
+            LOGGER.warn("FileWatchService created error, can't load the 
certificate dynamically");
         }
-
-        initialRpcHooks();
-        return true;
     }
 
     private void printWaterMark() {
-        WATER_MARK_LOG.info("[WATERMARK] ClientQueueSize:{} 
ClientQueueSlowTime:{} " +
-                "DefaultQueueSize:{} DefaultQueueSlowTime:{}",
-            this.clientRequestThreadPoolQueue.size(), 
headSlowTimeMills(this.clientRequestThreadPoolQueue),
-            this.defaultThreadPoolQueue.size(), 
headSlowTimeMills(this.defaultThreadPoolQueue));
+        WATER_MARK_LOG.info("[WATERMARK] ClientQueueSize:{} 
ClientQueueSlowTime:{} " + "DefaultQueueSize:{} DefaultQueueSlowTime:{}", 
this.clientRequestThreadPoolQueue.size(), 
headSlowTimeMills(this.clientRequestThreadPoolQueue), 
this.defaultThreadPoolQueue.size(), 
headSlowTimeMills(this.defaultThreadPoolQueue));
     }
 
     private long headSlowTimeMills(BlockingQueue<Runnable> q) {
@@ -214,7 +200,7 @@ public class NamesrvController {
         final Runnable firstRunnable = q.peek();
 
         if (firstRunnable instanceof FutureTaskExt) {
-            final Runnable inner = ((FutureTaskExt) 
firstRunnable).getRunnable();
+            final Runnable inner = ((FutureTaskExt<?>) 
firstRunnable).getRunnable();
             if (inner instanceof RequestTask) {
                 slowTimeMills = System.currentTimeMillis() - ((RequestTask) 
inner).getCreateTimestamp();
             }
@@ -230,8 +216,7 @@ public class NamesrvController {
     private void registerProcessor() {
         if (namesrvConfig.isClusterTest()) {
 
-            this.remotingServer.registerDefaultProcessor(new 
ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
-                this.defaultExecutor);
+            this.remotingServer.registerDefaultProcessor(new 
ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), 
this.defaultExecutor);
         } else {
             // Support get route info only temporarily
             ClientRequestProcessor clientRequestProcessor = new 
ClientRequestProcessor(this);
@@ -241,10 +226,10 @@ public class NamesrvController {
         }
     }
 
-    private void initialRpcHooks() {
+    private void initiateRpcHooks() {
         this.remotingServer.registerRPCHook(new ZoneRouteRPCHook());
     }
-    
+
     public void start() throws Exception {
         this.remotingServer.start();
 
diff --git 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
index 54a6320bf..d5f5d9762 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
@@ -19,8 +19,9 @@ package org.apache.rocketmq.namesrv;
 import ch.qos.logback.classic.LoggerContext;
 import ch.qos.logback.classic.joran.JoranConfigurator;
 import java.io.BufferedInputStream;
-import java.io.FileInputStream;
 import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.Properties;
 import java.util.concurrent.Callable;
 import org.apache.commons.cli.CommandLine;
@@ -46,7 +47,6 @@ public class NamesrvStartup {
 
     private static InternalLogger log;
     private static Properties properties = null;
-    private static CommandLine commandLine = null;
     private static NamesrvConfig namesrvConfig = null;
     private static NettyServerConfig nettyServerConfig = null;
     private static NettyClientConfig nettyClientConfig = null;
@@ -57,29 +57,26 @@ public class NamesrvStartup {
         controllerManagerMain();
     }
 
-    public static NamesrvController main0(String[] args) {
+    public static void main0(String[] args) {
         try {
             parseCommandlineAndConfigFile(args);
-            NamesrvController controller = createAndStartNamesrvController();
-            return controller;
+            createAndStartNamesrvController();
         } catch (Throwable e) {
             e.printStackTrace();
             System.exit(-1);
         }
 
-        return null;
     }
 
-    public static ControllerManager controllerManagerMain() {
+    public static void controllerManagerMain() {
         try {
             if (namesrvConfig.isEnableControllerInNamesrv()) {
-                return createAndStartControllerManager();
+                createAndStartControllerManager();
             }
         } catch (Throwable e) {
             e.printStackTrace();
             System.exit(-1);
         }
-        return null;
     }
 
     public static void parseCommandlineAndConfigFile(String[] args) throws 
Exception {
@@ -87,7 +84,7 @@ public class NamesrvStartup {
         //PackageConflictDetect.detectFastjson();
 
         Options options = ServerUtil.buildCommandlineOptions(new Options());
-        commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, 
buildCommandlineOptions(options), new PosixParser());
+        CommandLine commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, 
buildCommandlineOptions(options), new PosixParser());
         if (null == commandLine) {
             System.exit(-1);
             return;
@@ -101,7 +98,7 @@ public class NamesrvStartup {
         if (commandLine.hasOption('c')) {
             String file = commandLine.getOptionValue('c');
             if (file != null) {
-                InputStream in = new BufferedInputStream(new 
FileInputStream(file));
+                InputStream in = new 
BufferedInputStream(Files.newInputStream(Paths.get(file)));
                 properties = new Properties();
                 properties.load(in);
                 MixAll.properties2Object(properties, namesrvConfig);
@@ -144,14 +141,12 @@ public class NamesrvStartup {
 
     }
 
-    public static NamesrvController createAndStartNamesrvController() throws 
Exception {
-
+    public static void createAndStartNamesrvController() throws Exception {
         NamesrvController controller = createNamesrvController();
         start(controller);
         String tip = "The Name Server boot success. serializeType=" + 
RemotingCommand.getSerializeTypeConfigInThisServer();
         log.info(tip);
         System.out.printf("%s%n", tip);
-        return controller;
     }
 
     public static NamesrvController createNamesrvController() {
@@ -184,13 +179,12 @@ public class NamesrvStartup {
         return controller;
     }
 
-    public static ControllerManager createAndStartControllerManager() throws 
Exception {
+    public static void createAndStartControllerManager() throws Exception {
         ControllerManager controllerManager = createControllerManager();
         start(controllerManager);
         String tip = "The ControllerManager boot success. serializeType=" + 
RemotingCommand.getSerializeTypeConfigInThisServer();
         log.info(tip);
         System.out.printf("%s%n", tip);
-        return controllerManager;
     }
 
     public static ControllerManager createControllerManager() throws Exception 
{
diff --git 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 05cff6595..3facb3cf8 100644
--- 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -122,86 +122,79 @@ public class RouteInfoManager {
             return;
         }
         try {
-            try {
-                this.lock.writeLock().lockInterruptibly();
-                if (this.topicQueueTable.containsKey(topic)) {
-                    log.info("Topic route already exist.{}, {}", topic, 
this.topicQueueTable.get(topic));
-                } else {
-                    // check and construct queue data map
-                    Map<String, QueueData> queueDataMap = new HashMap<>();
-                    for (QueueData queueData : queueDatas) {
-                        if 
(!this.brokerAddrTable.containsKey(queueData.getBrokerName())) {
-                            log.warn("Register topic contains illegal broker, 
{}, {}", topic, queueData);
-                            return;
-                        }
-                        queueDataMap.put(queueData.getBrokerName(), queueData);
+            this.lock.writeLock().lockInterruptibly();
+            if (this.topicQueueTable.containsKey(topic)) {
+                log.info("Topic route already exist.{}, {}", topic, 
this.topicQueueTable.get(topic));
+            } else {
+                // check and construct queue data map
+                Map<String, QueueData> queueDataMap = new HashMap<>();
+                for (QueueData queueData : queueDatas) {
+                    if 
(!this.brokerAddrTable.containsKey(queueData.getBrokerName())) {
+                        log.warn("Register topic contains illegal broker, {}, 
{}", topic, queueData);
+                        return;
                     }
-
-                    this.topicQueueTable.put(topic, queueDataMap);
-                    log.info("Register topic route:{}, {}", topic, queueDatas);
+                    queueDataMap.put(queueData.getBrokerName(), queueData);
                 }
-            } finally {
-                this.lock.writeLock().unlock();
+
+                this.topicQueueTable.put(topic, queueDataMap);
+                log.info("Register topic route:{}, {}", topic, queueDatas);
             }
         } catch (Exception e) {
             log.error("registerTopic Exception", e);
+        } finally {
+            this.lock.writeLock().unlock();
         }
     }
 
     public void deleteTopic(final String topic) {
         try {
-            try {
-                this.lock.writeLock().lockInterruptibly();
-                this.topicQueueTable.remove(topic);
-            } finally {
-                this.lock.writeLock().unlock();
-            }
+            this.lock.writeLock().lockInterruptibly();
+            this.topicQueueTable.remove(topic);
         } catch (Exception e) {
             log.error("deleteTopic Exception", e);
+        } finally {
+            this.lock.writeLock().unlock();
         }
     }
 
     public void deleteTopic(final String topic, final String clusterName) {
         try {
-            try {
-                this.lock.writeLock().lockInterruptibly();
-                Set<String> brokerNames = 
this.clusterAddrTable.get(clusterName);
-                if (brokerNames != null
-                    && !brokerNames.isEmpty()) {
-                    Map<String, QueueData> queueDataMap = 
this.topicQueueTable.get(topic);
-                    if (queueDataMap != null) {
-                        for (String brokerName : brokerNames) {
-                            final QueueData removedQD = 
queueDataMap.remove(brokerName);
-                            if (removedQD != null) {
-                                log.info("deleteTopic, remove one broker's 
topic {} {} {}", brokerName, topic,
-                                    removedQD);
-                            }
-                        }
-                        if (queueDataMap.isEmpty()) {
-                            log.info("deleteTopic, remove the topic all queue 
{} {}", clusterName, topic);
-                            this.topicQueueTable.remove(topic);
-                        }
+            this.lock.writeLock().lockInterruptibly();
+            //get all the brokerNames fot the specified cluster
+            Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
+            if (brokerNames == null || brokerNames.isEmpty()) {
+                return;
+            }
+            //get the store information for single topic
+            Map<String, QueueData> queueDataMap = 
this.topicQueueTable.get(topic);
+            if (queueDataMap != null) {
+                for (String brokerName : brokerNames) {
+                    final QueueData removedQD = 
queueDataMap.remove(brokerName);
+                    if (removedQD != null) {
+                        log.info("deleteTopic, remove one broker's topic {} {} 
{}", brokerName, topic, removedQD);
                     }
                 }
-            } finally {
-                this.lock.writeLock().unlock();
+                if (queueDataMap.isEmpty()) {
+                    log.info("deleteTopic, remove the topic all queue {} {}", 
clusterName, topic);
+                    this.topicQueueTable.remove(topic);
+                }
             }
         } catch (Exception e) {
             log.error("deleteTopic Exception", e);
+        } finally {
+            this.lock.writeLock().unlock();
         }
     }
 
     public TopicList getAllTopicList() {
         TopicList topicList = new TopicList();
         try {
-            try {
-                this.lock.readLock().lockInterruptibly();
-                topicList.getTopicList().addAll(this.topicQueueTable.keySet());
-            } finally {
-                this.lock.readLock().unlock();
-            }
+            this.lock.readLock().lockInterruptibly();
+            topicList.getTopicList().addAll(this.topicQueueTable.keySet());
         } catch (Exception e) {
             log.error("getAllTopicList Exception", e);
+        } finally {
+            this.lock.readLock().unlock();
         }
 
         return topicList;
@@ -235,148 +228,147 @@ public class RouteInfoManager {
         final Channel channel) {
         RegisterBrokerResult result = new RegisterBrokerResult();
         try {
-            try {
-                this.lock.writeLock().lockInterruptibly();
+            this.lock.writeLock().lockInterruptibly();
 
-                Set<String> brokerNames = 
this.clusterAddrTable.computeIfAbsent(clusterName, k -> new HashSet<>());
-                brokerNames.add(brokerName);
+            //init or update the cluster info
+            Set<String> brokerNames = 
this.clusterAddrTable.computeIfAbsent(clusterName, k -> new HashSet<>());
+            brokerNames.add(brokerName);
 
-                boolean registerFirst = false;
+            boolean registerFirst = false;
 
-                BrokerData brokerData = this.brokerAddrTable.get(brokerName);
-                if (null == brokerData) {
-                    registerFirst = true;
-                    brokerData = new BrokerData(clusterName, brokerName, new 
HashMap<>());
-                    this.brokerAddrTable.put(brokerName, brokerData);
-                }
+            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
+            if (null == brokerData) {
+                registerFirst = true;
+                brokerData = new BrokerData(clusterName, brokerName, new 
HashMap<>());
+                this.brokerAddrTable.put(brokerName, brokerData);
+            }
 
-                boolean isOldVersionBroker = enableActingMaster == null;
-                brokerData.setEnableActingMaster(isOldVersionBroker ? false : 
enableActingMaster);
-                brokerData.setZoneName(zoneName);
-                
-                Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
+            boolean isOldVersionBroker = enableActingMaster == null;
+            brokerData.setEnableActingMaster(!isOldVersionBroker && 
enableActingMaster);
+            brokerData.setZoneName(zoneName);
 
-                boolean isMinBrokerIdChanged = false;
-                long prevMinBrokerId = 0;
-                if (!brokerAddrsMap.isEmpty()) {
-                    prevMinBrokerId = Collections.min(brokerAddrsMap.keySet());
-                }
+            Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
 
-                if (brokerId < prevMinBrokerId) {
-                    isMinBrokerIdChanged = true;
-                }
+            boolean isMinBrokerIdChanged = false;
+            long prevMinBrokerId = 0;
+            if (!brokerAddrsMap.isEmpty()) {
+                prevMinBrokerId = Collections.min(brokerAddrsMap.keySet());
+            }
+
+            if (brokerId < prevMinBrokerId) {
+                isMinBrokerIdChanged = true;
+            }
 
-                //Switch slave to master: first remove <1, IP:PORT> in 
namesrv, then add <0, IP:PORT>
-                //The same IP:PORT must only have one record in brokerAddrTable
-                brokerAddrsMap.entrySet().removeIf(item -> null != brokerAddr 
&& brokerAddr.equals(item.getValue()) && brokerId != item.getKey());
-
-                //If Local brokerId stateVersion bigger than the registering 
one,
-                String oldBrokerAddr = brokerAddrsMap.get(brokerId);
-                if (null != oldBrokerAddr && 
!oldBrokerAddr.equals(brokerAddr)) {
-                    BrokerLiveInfo oldBrokerInfo = brokerLiveTable.get(new 
BrokerAddrInfo(clusterName, oldBrokerAddr));
-
-                    if (null != oldBrokerInfo) {
-                        long oldStateVersion = 
oldBrokerInfo.getDataVersion().getStateVersion();
-                        long newStateVersion = 
topicConfigWrapper.getDataVersion().getStateVersion();
-                        if (oldStateVersion > newStateVersion) {
-                            log.warn("Registered Broker conflicts with the 
existed one, just ignore.: Cluster:{}, BrokerName:{}, BrokerId:{}, " +
-                                    "Old BrokerAddr:{}, Old Version:{}, New 
BrokerAddr:{}, New Version:{}.",
+            //Switch slave to master: first remove <1, IP:PORT> in namesrv, 
then add <0, IP:PORT>
+            //The same IP:PORT must only have one record in brokerAddrTable
+            brokerAddrsMap.entrySet().removeIf(item -> null != brokerAddr && 
brokerAddr.equals(item.getValue()) && brokerId != item.getKey());
+
+            //If Local brokerId stateVersion bigger than the registering one,
+            String oldBrokerAddr = brokerAddrsMap.get(brokerId);
+            if (null != oldBrokerAddr && !oldBrokerAddr.equals(brokerAddr)) {
+                BrokerLiveInfo oldBrokerInfo = brokerLiveTable.get(new 
BrokerAddrInfo(clusterName, oldBrokerAddr));
+
+                if (null != oldBrokerInfo) {
+                    long oldStateVersion = 
oldBrokerInfo.getDataVersion().getStateVersion();
+                    long newStateVersion = 
topicConfigWrapper.getDataVersion().getStateVersion();
+                    if (oldStateVersion > newStateVersion) {
+                        log.warn("Registered Broker conflicts with the existed 
one, just ignore.: Cluster:{}, BrokerName:{}, BrokerId:{}, " +
+                                        "Old BrokerAddr:{}, Old Version:{}, 
New BrokerAddr:{}, New Version:{}.",
                                 clusterName, brokerName, brokerId, 
oldBrokerAddr, oldStateVersion, brokerAddr, newStateVersion);
-                            //Remove the rejected brokerAddr from 
brokerLiveTable.
-                            brokerLiveTable.remove(new 
BrokerAddrInfo(clusterName, brokerAddr));
-                            return result;
-                        }
+                        //Remove the rejected brokerAddr from brokerLiveTable.
+                        brokerLiveTable.remove(new BrokerAddrInfo(clusterName, 
brokerAddr));
+                        return result;
                     }
                 }
+            }
 
-                if (!brokerAddrsMap.containsKey(brokerId) && 
topicConfigWrapper.getTopicConfigTable().size() == 1) {
-                    log.warn("Can't register topicConfigWrapper={} because 
broker[{}]={} has not registered.",
-                            topicConfigWrapper.getTopicConfigTable(), 
brokerId, brokerAddr);
-                    return null;
-                }
+            if (!brokerAddrsMap.containsKey(brokerId) && 
topicConfigWrapper.getTopicConfigTable().size() == 1) {
+                log.warn("Can't register topicConfigWrapper={} because 
broker[{}]={} has not registered.",
+                        topicConfigWrapper.getTopicConfigTable(), brokerId, 
brokerAddr);
+                return null;
+            }
 
-                String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr);
-                registerFirst = registerFirst || 
(StringUtils.isEmpty(oldAddr));
+            String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr);
+            registerFirst = registerFirst || (StringUtils.isEmpty(oldAddr));
 
-                boolean isMaster = MixAll.MASTER_ID == brokerId;
-                boolean isPrimeSlave = !isOldVersionBroker && !isMaster
+            boolean isMaster = MixAll.MASTER_ID == brokerId;
+            boolean isPrimeSlave = !isOldVersionBroker && !isMaster
                     && brokerId == Collections.min(brokerAddrsMap.keySet());
 
-                if (null != topicConfigWrapper && (isMaster || isPrimeSlave)) {
+            if (null != topicConfigWrapper && (isMaster || isPrimeSlave)) {
 
-                    ConcurrentMap<String, TopicConfig> tcTable =
+                ConcurrentMap<String, TopicConfig> tcTable =
                         topicConfigWrapper.getTopicConfigTable();
-                    if (tcTable != null) {
-                        for (Map.Entry<String, TopicConfig> entry : 
tcTable.entrySet()) {
-                            if (registerFirst || 
this.isTopicConfigChanged(clusterName, brokerAddr,
+                if (tcTable != null) {
+                    for (Map.Entry<String, TopicConfig> entry : 
tcTable.entrySet()) {
+                        if (registerFirst || 
this.isTopicConfigChanged(clusterName, brokerAddr,
                                 topicConfigWrapper.getDataVersion(), 
brokerName,
                                 entry.getValue().getTopicName())) {
-                                final TopicConfig topicConfig = 
entry.getValue();
-                                if (isPrimeSlave) {
-                                    // Wipe write perm for prime slave
-                                    topicConfig.setPerm(topicConfig.getPerm() 
& (~PermName.PERM_WRITE));
-                                }
-                                this.createAndUpdateQueueData(brokerName, 
topicConfig);
+                            final TopicConfig topicConfig = entry.getValue();
+                            if (isPrimeSlave) {
+                                // Wipe write perm for prime slave
+                                topicConfig.setPerm(topicConfig.getPerm() & 
(~PermName.PERM_WRITE));
                             }
+                            this.createAndUpdateQueueData(brokerName, 
topicConfig);
                         }
                     }
+                }
 
-                    if (this.isBrokerTopicConfigChanged(clusterName, 
brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {
-                        TopicConfigAndMappingSerializeWrapper 
mappingSerializeWrapper = 
TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);
-                        Map<String, TopicQueueMappingInfo> 
topicQueueMappingInfoMap = 
mappingSerializeWrapper.getTopicQueueMappingInfoMap();
-                        //the topicQueueMappingInfoMap should never be null, 
but can be empty
-                        for (Map.Entry<String, TopicQueueMappingInfo> entry : 
topicQueueMappingInfoMap.entrySet()) {
-                            if 
(!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
-                                topicQueueMappingInfoTable.put(entry.getKey(), 
new HashMap<String, TopicQueueMappingInfo>());
-                            }
-                            //Note asset brokerName equal 
entry.getValue().getBname()
-                            //here use the mappingDetail.bname
-                            
topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), 
entry.getValue());
+                if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, 
topicConfigWrapper.getDataVersion()) || registerFirst) {
+                    TopicConfigAndMappingSerializeWrapper 
mappingSerializeWrapper = 
TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);
+                    Map<String, TopicQueueMappingInfo> 
topicQueueMappingInfoMap = 
mappingSerializeWrapper.getTopicQueueMappingInfoMap();
+                    //the topicQueueMappingInfoMap should never be null, but 
can be empty
+                    for (Map.Entry<String, TopicQueueMappingInfo> entry : 
topicQueueMappingInfoMap.entrySet()) {
+                        if 
(!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
+                            topicQueueMappingInfoTable.put(entry.getKey(), new 
HashMap<>());
                         }
+                        //Note asset brokerName equal 
entry.getValue().getBname()
+                        //here use the mappingDetail.bname
+                        
topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), 
entry.getValue());
                     }
                 }
+            }
 
-                BrokerAddrInfo brokerAddrInfo = new 
BrokerAddrInfo(clusterName, brokerAddr);
-                BrokerLiveInfo prevBrokerLiveInfo = 
this.brokerLiveTable.put(brokerAddrInfo,
+            BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName, 
brokerAddr);
+            BrokerLiveInfo prevBrokerLiveInfo = 
this.brokerLiveTable.put(brokerAddrInfo,
                     new BrokerLiveInfo(
-                        System.currentTimeMillis(),
-                        timeoutMillis == null ? 
DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
-                        topicConfigWrapper == null ? new DataVersion() : 
topicConfigWrapper.getDataVersion(),
-                        channel,
-                        haServerAddr));
-                if (null == prevBrokerLiveInfo) {
-                    log.info("new broker registered, {} HAService: {}", 
brokerAddrInfo, haServerAddr);
-                }
+                            System.currentTimeMillis(),
+                            timeoutMillis == null ? 
DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
+                            topicConfigWrapper == null ? new DataVersion() : 
topicConfigWrapper.getDataVersion(),
+                            channel,
+                            haServerAddr));
+            if (null == prevBrokerLiveInfo) {
+                log.info("new broker registered, {} HAService: {}", 
brokerAddrInfo, haServerAddr);
+            }
 
-                if (filterServerList != null) {
-                    if (filterServerList.isEmpty()) {
-                        this.filterServerTable.remove(brokerAddrInfo);
-                    } else {
-                        this.filterServerTable.put(brokerAddrInfo, 
filterServerList);
-                    }
+            if (filterServerList != null) {
+                if (filterServerList.isEmpty()) {
+                    this.filterServerTable.remove(brokerAddrInfo);
+                } else {
+                    this.filterServerTable.put(brokerAddrInfo, 
filterServerList);
                 }
+            }
 
-                if (MixAll.MASTER_ID != brokerId) {
-                    String masterAddr = 
brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
-                    if (masterAddr != null) {
-                        BrokerAddrInfo masterAddrInfo = new 
BrokerAddrInfo(clusterName, masterAddr);
-                        BrokerLiveInfo masterLiveInfo = 
this.brokerLiveTable.get(masterAddrInfo);
-                        if (masterLiveInfo != null) {
-                            
result.setHaServerAddr(masterLiveInfo.getHaServerAddr());
-                            result.setMasterAddr(masterAddr);
-                        }
+            if (MixAll.MASTER_ID != brokerId) {
+                String masterAddr = 
brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
+                if (masterAddr != null) {
+                    BrokerAddrInfo masterAddrInfo = new 
BrokerAddrInfo(clusterName, masterAddr);
+                    BrokerLiveInfo masterLiveInfo = 
this.brokerLiveTable.get(masterAddrInfo);
+                    if (masterLiveInfo != null) {
+                        
result.setHaServerAddr(masterLiveInfo.getHaServerAddr());
+                        result.setMasterAddr(masterAddr);
                     }
                 }
+            }
 
-                if (isMinBrokerIdChanged && 
namesrvConfig.isNotifyMinBrokerIdChanged()) {
-                    notifyMinBrokerIdChanged(brokerAddrsMap, null,
+            if (isMinBrokerIdChanged && 
namesrvConfig.isNotifyMinBrokerIdChanged()) {
+                notifyMinBrokerIdChanged(brokerAddrsMap, null,
                         
this.brokerLiveTable.get(brokerAddrInfo).getHaServerAddr());
-                }
-            } finally {
-                this.lock.writeLock().unlock();
             }
         } catch (Exception e) {
             log.error("registerBroker Exception", e);
+        } finally {
+            this.lock.writeLock().unlock();
         }
 
         return result;
@@ -417,12 +409,8 @@ public class RouteInfoManager {
             return true;
         }
 
-        if (queueDataMap.containsKey(brokerName)) {
-            // The topicQueueTable already contains the broker
-            return false;
-        }
-
-        return true;
+        // The topicQueueTable already contains the broker
+        return !queueDataMap.containsKey(brokerName);
     }
 
     public DataVersion queryBrokerTopicConfig(final String clusterName, final 
String brokerAddr) {
@@ -500,25 +488,24 @@ public class RouteInfoManager {
     private int operateWritePermOfBroker(final String brokerName, final int 
requestCode) {
         int topicCnt = 0;
 
-        Iterator<Entry<String, Map<String, QueueData>>> itTopic = 
this.topicQueueTable.entrySet().iterator();
-        while (itTopic.hasNext()) {
-            Entry<String, Map<String, QueueData>> entry = itTopic.next();
+        for (Entry<String, Map<String, QueueData>> entry : 
this.topicQueueTable.entrySet()) {
             Map<String, QueueData> qdMap = entry.getValue();
 
             final QueueData qd = qdMap.get(brokerName);
-            if (qd != null) {
-                int perm = qd.getPerm();
-                switch (requestCode) {
-                    case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
-                        perm &= ~PermName.PERM_WRITE;
-                        break;
-                    case RequestCode.ADD_WRITE_PERM_OF_BROKER:
-                        perm = PermName.PERM_READ | PermName.PERM_WRITE;
-                        break;
-                }
-                qd.setPerm(perm);
-                topicCnt++;
+            if (qd == null) {
+                continue;
+            }
+            int perm = qd.getPerm();
+            switch (requestCode) {
+                case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
+                    perm &= ~PermName.PERM_WRITE;
+                    break;
+                case RequestCode.ADD_WRITE_PERM_OF_BROKER:
+                    perm = PermName.PERM_READ | PermName.PERM_WRITE;
+                    break;
             }
+            qd.setPerm(perm);
+            topicCnt++;
         }
         return topicCnt;
     }
@@ -671,7 +658,6 @@ public class RouteInfoManager {
         TopicRouteData topicRouteData = new TopicRouteData();
         boolean foundQueueData = false;
         boolean foundBrokerData = false;
-        Set<String> brokerNameSet = new HashSet<>();
         List<BrokerData> brokerDataList = new LinkedList<>();
         topicRouteData.setBrokerDatas(brokerDataList);
 
@@ -679,40 +665,41 @@ public class RouteInfoManager {
         topicRouteData.setFilterServerTable(filterServerMap);
 
         try {
-            try {
-                this.lock.readLock().lockInterruptibly();
-                Map<String, QueueData> queueDataMap = 
this.topicQueueTable.get(topic);
-                if (queueDataMap != null) {
-                    topicRouteData.setQueueDatas(new 
ArrayList<>(queueDataMap.values()));
-                    foundQueueData = true;
-
-                    brokerNameSet.addAll(queueDataMap.keySet());
-
-                    for (String brokerName : brokerNameSet) {
-                        BrokerData brokerData = 
this.brokerAddrTable.get(brokerName);
-                        if (null != brokerData) {
-                            BrokerData brokerDataClone = new 
BrokerData(brokerData.getCluster(), 
-                                brokerData.getBrokerName(),
-                                (HashMap<Long, String>) 
brokerData.getBrokerAddrs().clone(),
-                                brokerData.isEnableActingMaster(), 
brokerData.getZoneName());
-
-                            brokerDataList.add(brokerDataClone);
-                            foundBrokerData = true;
-                            if (!filterServerTable.isEmpty()) {
-                                for (final String brokerAddr : 
brokerDataClone.getBrokerAddrs().values()) {
-                                    BrokerAddrInfo brokerAddrInfo = new 
BrokerAddrInfo(brokerDataClone.getCluster(), brokerAddr);
-                                    List<String> filterServerList = 
this.filterServerTable.get(brokerAddrInfo);
-                                    filterServerMap.put(brokerAddr, 
filterServerList);
-                                }
-                            }
-                        }
+            this.lock.readLock().lockInterruptibly();
+            Map<String, QueueData> queueDataMap = 
this.topicQueueTable.get(topic);
+            if (queueDataMap != null) {
+                topicRouteData.setQueueDatas(new 
ArrayList<>(queueDataMap.values()));
+                foundQueueData = true;
+
+                Set<String> brokerNameSet = new 
HashSet<>(queueDataMap.keySet());
+
+                for (String brokerName : brokerNameSet) {
+                    BrokerData brokerData = 
this.brokerAddrTable.get(brokerName);
+                    if (null == brokerData) {
+                        continue;
+                    }
+                    BrokerData brokerDataClone = new 
BrokerData(brokerData.getCluster(),
+                            brokerData.getBrokerName(),
+                            (HashMap<Long, String>) 
brokerData.getBrokerAddrs().clone(),
+                            brokerData.isEnableActingMaster(), 
brokerData.getZoneName());
+
+                    brokerDataList.add(brokerDataClone);
+                    foundBrokerData = true;
+                    if (filterServerTable.isEmpty()) {
+                        continue;
                     }
+                    for (final String brokerAddr : 
brokerDataClone.getBrokerAddrs().values()) {
+                        BrokerAddrInfo brokerAddrInfo = new 
BrokerAddrInfo(brokerDataClone.getCluster(), brokerAddr);
+                        List<String> filterServerList = 
this.filterServerTable.get(brokerAddrInfo);
+                        filterServerMap.put(brokerAddr, filterServerList);
+                    }
+
                 }
-            } finally {
-                this.lock.readLock().unlock();
             }
         } catch (Exception e) {
             log.error("pickupTopicRouteData Exception", e);
+        } finally {
+            this.lock.readLock().unlock();
         }
 
         log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
@@ -937,36 +924,28 @@ public class RouteInfoManager {
                 
log.info("--------------------------------------------------------");
                 {
                     log.info("topicQueueTable SIZE: {}", 
this.topicQueueTable.size());
-                    Iterator<Entry<String, Map<String, QueueData>>> it = 
this.topicQueueTable.entrySet().iterator();
-                    while (it.hasNext()) {
-                        Entry<String, Map<String, QueueData>> next = it.next();
+                    for (Entry<String, Map<String, QueueData>> next : 
this.topicQueueTable.entrySet()) {
                         log.info("topicQueueTable Topic: {} {}", 
next.getKey(), next.getValue());
                     }
                 }
 
                 {
                     log.info("brokerAddrTable SIZE: {}", 
this.brokerAddrTable.size());
-                    Iterator<Entry<String, BrokerData>> it = 
this.brokerAddrTable.entrySet().iterator();
-                    while (it.hasNext()) {
-                        Entry<String, BrokerData> next = it.next();
+                    for (Entry<String, BrokerData> next : 
this.brokerAddrTable.entrySet()) {
                         log.info("brokerAddrTable brokerName: {} {}", 
next.getKey(), next.getValue());
                     }
                 }
 
                 {
                     log.info("brokerLiveTable SIZE: {}", 
this.brokerLiveTable.size());
-                    Iterator<Entry<BrokerAddrInfo, BrokerLiveInfo>> it = 
this.brokerLiveTable.entrySet().iterator();
-                    while (it.hasNext()) {
-                        Entry<BrokerAddrInfo, BrokerLiveInfo> next = it.next();
+                    for (Entry<BrokerAddrInfo, BrokerLiveInfo> next : 
this.brokerLiveTable.entrySet()) {
                         log.info("brokerLiveTable brokerAddr: {} {}", 
next.getKey(), next.getValue());
                     }
                 }
 
                 {
                     log.info("clusterAddrTable SIZE: {}", 
this.clusterAddrTable.size());
-                    Iterator<Entry<String, Set<String>>> it = 
this.clusterAddrTable.entrySet().iterator();
-                    while (it.hasNext()) {
-                        Entry<String, Set<String>> next = it.next();
+                    for (Entry<String, Set<String>> next : 
this.clusterAddrTable.entrySet()) {
                         log.info("clusterAddrTable clusterName: {} {}", 
next.getKey(), next.getValue());
                     }
                 }
@@ -981,30 +960,27 @@ public class RouteInfoManager {
     public TopicList getSystemTopicList() {
         TopicList topicList = new TopicList();
         try {
-            try {
-                this.lock.readLock().lockInterruptibly();
-                for (Map.Entry<String, Set<String>> entry : 
clusterAddrTable.entrySet()) {
-                    topicList.getTopicList().add(entry.getKey());
-                    topicList.getTopicList().addAll(entry.getValue());
-                }
+            this.lock.readLock().lockInterruptibly();
+            for (Map.Entry<String, Set<String>> entry : 
clusterAddrTable.entrySet()) {
+                topicList.getTopicList().add(entry.getKey());
+                topicList.getTopicList().addAll(entry.getValue());
+            }
 
-                if (brokerAddrTable != null && !brokerAddrTable.isEmpty()) {
-                    Iterator<String> it = brokerAddrTable.keySet().iterator();
-                    while (it.hasNext()) {
-                        BrokerData bd = brokerAddrTable.get(it.next());
-                        HashMap<Long, String> brokerAddrs = 
bd.getBrokerAddrs();
-                        if (brokerAddrs != null && !brokerAddrs.isEmpty()) {
-                            Iterator<Long> it2 = 
brokerAddrs.keySet().iterator();
-                            
topicList.setBrokerAddr(brokerAddrs.get(it2.next()));
-                            break;
-                        }
+            if (!brokerAddrTable.isEmpty()) {
+                for (String s : brokerAddrTable.keySet()) {
+                    BrokerData bd = brokerAddrTable.get(s);
+                    HashMap<Long, String> brokerAddrs = bd.getBrokerAddrs();
+                    if (brokerAddrs != null && !brokerAddrs.isEmpty()) {
+                        Iterator<Long> it2 = brokerAddrs.keySet().iterator();
+                        topicList.setBrokerAddr(brokerAddrs.get(it2.next()));
+                        break;
                     }
                 }
-            } finally {
-                this.lock.readLock().unlock();
             }
         } catch (Exception e) {
             log.error("getSystemTopicList Exception", e);
+        } finally {
+            this.lock.readLock().unlock();
         }
 
         return topicList;
@@ -1042,24 +1018,19 @@ public class RouteInfoManager {
     public TopicList getUnitTopics() {
         TopicList topicList = new TopicList();
         try {
-            try {
-                this.lock.readLock().lockInterruptibly();
-                Iterator<Entry<String, Map<String, QueueData>>> topicTableIt =
-                    this.topicQueueTable.entrySet().iterator();
-                while (topicTableIt.hasNext()) {
-                    Entry<String, Map<String, QueueData>> topicEntry = 
topicTableIt.next();
-                    String topic = topicEntry.getKey();
-                    Map<String, QueueData> queueDatas = topicEntry.getValue();
-                    if (queueDatas != null && queueDatas.size() > 0
+            this.lock.readLock().lockInterruptibly();
+            for (Entry<String, Map<String, QueueData>> topicEntry : 
this.topicQueueTable.entrySet()) {
+                String topic = topicEntry.getKey();
+                Map<String, QueueData> queueDatas = topicEntry.getValue();
+                if (queueDatas != null && queueDatas.size() > 0
                         && 
TopicSysFlag.hasUnitFlag(queueDatas.values().iterator().next().getTopicSysFlag()))
 {
-                        topicList.getTopicList().add(topic);
-                    }
+                    topicList.getTopicList().add(topic);
                 }
-            } finally {
-                this.lock.readLock().unlock();
             }
         } catch (Exception e) {
             log.error("getUnitTopics Exception", e);
+        } finally {
+            this.lock.readLock().unlock();
         }
 
         return topicList;
@@ -1068,24 +1039,19 @@ public class RouteInfoManager {
     public TopicList getHasUnitSubTopicList() {
         TopicList topicList = new TopicList();
         try {
-            try {
-                this.lock.readLock().lockInterruptibly();
-                Iterator<Entry<String, Map<String, QueueData>>> topicTableIt =
-                    this.topicQueueTable.entrySet().iterator();
-                while (topicTableIt.hasNext()) {
-                    Entry<String, Map<String, QueueData>> topicEntry = 
topicTableIt.next();
-                    String topic = topicEntry.getKey();
-                    Map<String, QueueData> queueDatas = topicEntry.getValue();
-                    if (queueDatas != null && queueDatas.size() > 0
+            this.lock.readLock().lockInterruptibly();
+            for (Entry<String, Map<String, QueueData>> topicEntry : 
this.topicQueueTable.entrySet()) {
+                String topic = topicEntry.getKey();
+                Map<String, QueueData> queueDatas = topicEntry.getValue();
+                if (queueDatas != null && queueDatas.size() > 0
                         && 
TopicSysFlag.hasUnitSubFlag(queueDatas.values().iterator().next().getTopicSysFlag()))
 {
-                        topicList.getTopicList().add(topic);
-                    }
+                    topicList.getTopicList().add(topic);
                 }
-            } finally {
-                this.lock.readLock().unlock();
             }
         } catch (Exception e) {
             log.error("getHasUnitSubTopicList Exception", e);
+        } finally {
+            this.lock.readLock().unlock();
         }
 
         return topicList;
@@ -1094,25 +1060,20 @@ public class RouteInfoManager {
     public TopicList getHasUnitSubUnUnitTopicList() {
         TopicList topicList = new TopicList();
         try {
-            try {
-                this.lock.readLock().lockInterruptibly();
-                Iterator<Entry<String, Map<String, QueueData>>> topicTableIt =
-                    this.topicQueueTable.entrySet().iterator();
-                while (topicTableIt.hasNext()) {
-                    Entry<String, Map<String, QueueData>> topicEntry = 
topicTableIt.next();
-                    String topic = topicEntry.getKey();
-                    Map<String, QueueData> queueDatas = topicEntry.getValue();
-                    if (queueDatas != null && queueDatas.size() > 0
+            this.lock.readLock().lockInterruptibly();
+            for (Entry<String, Map<String, QueueData>> topicEntry : 
this.topicQueueTable.entrySet()) {
+                String topic = topicEntry.getKey();
+                Map<String, QueueData> queueDatas = topicEntry.getValue();
+                if (queueDatas != null && queueDatas.size() > 0
                         && 
!TopicSysFlag.hasUnitFlag(queueDatas.values().iterator().next().getTopicSysFlag())
                         && 
TopicSysFlag.hasUnitSubFlag(queueDatas.values().iterator().next().getTopicSysFlag()))
 {
-                        topicList.getTopicList().add(topic);
-                    }
+                    topicList.getTopicList().add(topic);
                 }
-            } finally {
-                this.lock.readLock().unlock();
             }
         } catch (Exception e) {
             log.error("getHasUnitSubUnUnitTopicList Exception", e);
+        } finally {
+            this.lock.readLock().unlock();
         }
 
         return topicList;
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 7a164ab08..b5569bb97 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -17,7 +17,6 @@
 package org.apache.rocketmq.remoting.netty;
 
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.ssl.SslContext;
@@ -73,14 +72,14 @@ public abstract class NettyRemotingAbstract {
      * This map caches all on-going requests.
      */
     protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> 
responseTable =
-        new ConcurrentHashMap<Integer, ResponseFuture>(256);
+            new ConcurrentHashMap<>(256);
 
     /**
      * This container holds all processors per request code, aka, for each 
incoming request, we may look up the
      * responding processor in this map to handle the request.
      */
     protected final HashMap<Integer/* request code */, 
Pair<NettyRequestProcessor, ExecutorService>> processorTable =
-        new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
+            new HashMap<>(64);
 
     /**
      * Executor to feed netty events to user defined {@link 
ChannelEventListener}.
@@ -90,7 +89,7 @@ public abstract class NettyRemotingAbstract {
     /**
      * The default request processor to use in case there is no exact match in 
{@link #processorTable} per request code.
      */
-    protected Pair<NettyRequestProcessor, ExecutorService> 
defaultRequestProcessor;
+    protected Pair<NettyRequestProcessor, ExecutorService> 
defaultRequestProcessorPair;
 
     /**
      * SSL context via which to create {@link SslHandler}.
@@ -100,7 +99,7 @@ public abstract class NettyRemotingAbstract {
     /**
      * custom rpc hooks
      */
-    protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>();
+    protected List<RPCHook> rpcHooks = new ArrayList<>();
 
     static {
         NettyLogger.initNettyLogger();
@@ -147,17 +146,15 @@ public abstract class NettyRemotingAbstract {
      *
      * @param ctx Channel handler context.
      * @param msg incoming remoting command.
-     * @throws Exception if there were any error while processing the incoming 
command.
      */
-    public void processMessageReceived(ChannelHandlerContext ctx, 
RemotingCommand msg) throws Exception {
-        final RemotingCommand cmd = msg;
-        if (cmd != null) {
-            switch (cmd.getType()) {
+    public void processMessageReceived(ChannelHandlerContext ctx, 
RemotingCommand msg) {
+        if (msg != null) {
+            switch (msg.getType()) {
                 case REQUEST_COMMAND:
-                    processRequestCommand(ctx, cmd);
+                    processRequestCommand(ctx, msg);
                     break;
                 case RESPONSE_COMMAND:
-                    processResponseCommand(ctx, cmd);
+                    processResponseCommand(ctx, msg);
                     break;
                 default:
                     break;
@@ -189,103 +186,104 @@ public abstract class NettyRemotingAbstract {
      */
     public void processRequestCommand(final ChannelHandlerContext ctx, final 
RemotingCommand cmd) {
         final Pair<NettyRequestProcessor, ExecutorService> matched = 
this.processorTable.get(cmd.getCode());
-        final Pair<NettyRequestProcessor, ExecutorService> pair = null == 
matched ? this.defaultRequestProcessor : matched;
+        final Pair<NettyRequestProcessor, ExecutorService> pair = null == 
matched ? this.defaultRequestProcessorPair : matched;
         final int opaque = cmd.getOpaque();
 
-        if (pair != null) {
-            Runnable run = new Runnable() {
-                @Override
-                public void run() {
-                    Exception exception = null;
-                    RemotingCommand response;
-
-                    try {
-                        String remoteAddr = 
RemotingHelper.parseChannelRemoteAddr(ctx.channel());
-                        try {
-                            doBeforeRpcHooks(remoteAddr, cmd);
-                        } catch (Exception e) {
-                            exception = e;
-                        }
-
-                        if (exception == null) {
-                            response = pair.getObject1().processRequest(ctx, 
cmd);
-                        } else {
-                            response = 
RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, 
null);
-                        }
-
-                        try {
-                            doAfterRpcHooks(remoteAddr, cmd, response);
-                        } catch (Exception e) {
-                            exception = e;
-                        }
+        if (pair == null) {
+            String error = " request type " + cmd.getCode() + " not supported";
+            final RemotingCommand response =
+                
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED,
 error);
+            response.setOpaque(opaque);
+            ctx.writeAndFlush(response);
+            log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + 
error);
+            return;
+        }
 
-                        if (exception != null) {
-                            throw exception;
-                        }
+        Runnable run = buildProcessRequestHandler(ctx, cmd, pair, opaque);
 
-                        if (!cmd.isOnewayRPC()) {
-                            if (response != null) {
-                                response.setOpaque(opaque);
-                                response.markResponseType();
-                                try {
-                                    ctx.writeAndFlush(response);
-                                } catch (Throwable e) {
-                                    log.error("process request over, but 
response failed", e);
-                                    log.error(cmd.toString());
-                                    log.error(response.toString());
-                                }
-                            } else {
-
-                            }
-                        }
-                    } catch (Throwable e) {
-                        log.error("process request exception", e);
-                        log.error(cmd.toString());
+        if (pair.getObject1().rejectRequest()) {
+            final RemotingCommand response = 
RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
+                    "[REJECTREQUEST]system busy, start flow control for a 
while");
+            response.setOpaque(opaque);
+            ctx.writeAndFlush(response);
+            return;
+        }
 
-                        if (!cmd.isOnewayRPC()) {
-                            response = 
RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
-                                RemotingHelper.exceptionSimpleDesc(e));
-                            response.setOpaque(opaque);
-                            ctx.writeAndFlush(response);
-                        }
-                    }
-                }
-            };
+        try {
+            final RequestTask requestTask = new RequestTask(run, 
ctx.channel(), cmd);
+            //async execute task, current thread return directly
+            pair.getObject2().submit(requestTask);
+        } catch (RejectedExecutionException e) {
+            if ((System.currentTimeMillis() % 10000) == 0) {
+                log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+                        + ", too many requests and system thread pool busy, 
RejectedExecutionException "
+                        + pair.getObject2().toString()
+                        + " request code: " + cmd.getCode());
+            }
 
-            if (pair.getObject1().rejectRequest()) {
+            if (!cmd.isOnewayRPC()) {
                 final RemotingCommand response = 
RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
-                    "[REJECTREQUEST]system busy, start flow control for a 
while");
+                        "[OVERLOAD]system busy, start flow control for a 
while");
                 response.setOpaque(opaque);
                 ctx.writeAndFlush(response);
-                return;
             }
+        }
+    }
+
+    private Runnable buildProcessRequestHandler(ChannelHandlerContext ctx, 
RemotingCommand cmd, Pair<NettyRequestProcessor, ExecutorService> pair, int 
opaque) {
+        return () -> {
+            Exception exception = null;
+            RemotingCommand response;
 
             try {
-                final RequestTask requestTask = new RequestTask(run, 
ctx.channel(), cmd);
-                pair.getObject2().submit(requestTask);
-            } catch (RejectedExecutionException e) {
-                if ((System.currentTimeMillis() % 10000) == 0) {
-                    
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
-                        + ", too many requests and system thread pool busy, 
RejectedExecutionException "
-                        + pair.getObject2().toString()
-                        + " request code: " + cmd.getCode());
+                String remoteAddr = 
RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+                try {
+                    doBeforeRpcHooks(remoteAddr, cmd);
+                } catch (Exception e) {
+                    exception = e;
+                }
+
+                if (exception == null) {
+                    response = pair.getObject1().processRequest(ctx, cmd);
+                } else {
+                    response = 
RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, 
null);
+                }
+
+                try {
+                    doAfterRpcHooks(remoteAddr, cmd, response);
+                } catch (Exception e) {
+                    exception = e;
+                }
+
+                if (exception != null) {
+                    throw exception;
                 }
 
                 if (!cmd.isOnewayRPC()) {
-                    final RemotingCommand response = 
RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
-                        "[OVERLOAD]system busy, start flow control for a 
while");
+                    if (response != null) {
+                        response.setOpaque(opaque);
+                        response.markResponseType();
+                        try {
+                            ctx.writeAndFlush(response);
+                        } catch (Throwable e) {
+                            log.error("process request over, but response 
failed", e);
+                            log.error(cmd.toString());
+                            log.error(response.toString());
+                        }
+                    }
+                }
+            } catch (Throwable e) {
+                log.error("process request exception", e);
+                log.error(cmd.toString());
+
+                if (!cmd.isOnewayRPC()) {
+                    response = 
RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
+                            RemotingHelper.exceptionSimpleDesc(e));
                     response.setOpaque(opaque);
                     ctx.writeAndFlush(response);
                 }
             }
-        } else {
-            String error = " request type " + cmd.getCode() + " not supported";
-            final RemotingCommand response =
-                
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED,
 error);
-            response.setOpaque(opaque);
-            ctx.writeAndFlush(response);
-            log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + 
error);
-        }
+        };
     }
 
     /**
@@ -322,16 +320,13 @@ public abstract class NettyRemotingAbstract {
         ExecutorService executor = this.getCallbackExecutor();
         if (executor != null) {
             try {
-                executor.submit(new Runnable() {
-                    @Override
-                    public void run() {
-                        try {
-                            responseFuture.executeInvokeCallback();
-                        } catch (Throwable e) {
-                            log.warn("execute callback in executor exception, 
and callback throw", e);
-                        } finally {
-                            responseFuture.release();
-                        }
+                executor.submit(() -> {
+                    try {
+                        responseFuture.executeInvokeCallback();
+                    } catch (Throwable e) {
+                        log.warn("execute callback in executor exception, and 
callback throw", e);
+                    } finally {
+                        responseFuture.release();
                     }
                 });
             } catch (Exception e) {
@@ -386,7 +381,7 @@ public abstract class NettyRemotingAbstract {
      * </p>
      */
     public void scanResponseTable() {
-        final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
+        final List<ResponseFuture> rfList = new LinkedList<>();
         Iterator<Entry<Integer, ResponseFuture>> it = 
this.responseTable.entrySet().iterator();
         while (it.hasNext()) {
             Entry<Integer, ResponseFuture> next = it.next();
@@ -412,27 +407,24 @@ public abstract class NettyRemotingAbstract {
     public RemotingCommand invokeSyncImpl(final Channel channel, final 
RemotingCommand request,
         final long timeoutMillis)
         throws InterruptedException, RemotingSendRequestException, 
RemotingTimeoutException {
+        //get the request id
         final int opaque = request.getOpaque();
 
         try {
             final ResponseFuture responseFuture = new ResponseFuture(channel, 
opaque, timeoutMillis, null, null);
             this.responseTable.put(opaque, responseFuture);
             final SocketAddress addr = channel.remoteAddress();
-            channel.writeAndFlush(request).addListener(new 
ChannelFutureListener() {
-                @Override
-                public void operationComplete(ChannelFuture f) throws 
Exception {
-                    if (f.isSuccess()) {
-                        responseFuture.setSendRequestOK(true);
-                        return;
-                    } else {
-                        responseFuture.setSendRequestOK(false);
-                    }
-
-                    responseTable.remove(opaque);
-                    responseFuture.setCause(f.cause());
-                    responseFuture.putResponse(null);
-                    log.warn("Failed to write a request command to {}, caused 
by underlying I/O operation failure", addr);
+            channel.writeAndFlush(request).addListener((ChannelFutureListener) 
f -> {
+                if (f.isSuccess()) {
+                    responseFuture.setSendRequestOK(true);
+                    return;
                 }
+
+                responseFuture.setSendRequestOK(false);
+                responseTable.remove(opaque);
+                responseFuture.setCause(f.cause());
+                responseFuture.putResponse(null);
+                log.warn("Failed to write a request command to {}, caused by 
underlying I/O operation failure", addr);
             });
 
             RemotingCommand responseCommand = 
responseFuture.waitResponse(timeoutMillis);
@@ -468,16 +460,13 @@ public abstract class NettyRemotingAbstract {
             final ResponseFuture responseFuture = new ResponseFuture(channel, 
opaque, timeoutMillis - costTime, invokeCallback, once);
             this.responseTable.put(opaque, responseFuture);
             try {
-                channel.writeAndFlush(request).addListener(new 
ChannelFutureListener() {
-                    @Override
-                    public void operationComplete(ChannelFuture f) throws 
Exception {
-                        if (f.isSuccess()) {
-                            responseFuture.setSendRequestOK(true);
-                            return;
-                        }
-                        requestFail(opaque);
-                        log.warn("send a request command to channel <{}> 
failed.", RemotingHelper.parseChannelRemoteAddr(channel));
+                
channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {
+                    if (f.isSuccess()) {
+                        responseFuture.setSendRequestOK(true);
+                        return;
                     }
+                    requestFail(opaque);
+                    log.warn("send a request command to channel <{}> failed.", 
RemotingHelper.parseChannelRemoteAddr(channel));
                 });
             } catch (Exception e) {
                 responseFuture.release();
@@ -521,9 +510,7 @@ public abstract class NettyRemotingAbstract {
      * @param channel the channel which is close already
      */
     protected void failFast(final Channel channel) {
-        Iterator<Entry<Integer, ResponseFuture>> it = 
responseTable.entrySet().iterator();
-        while (it.hasNext()) {
-            Entry<Integer, ResponseFuture> entry = it.next();
+        for (Entry<Integer, ResponseFuture> entry : responseTable.entrySet()) {
             if (entry.getValue().getChannel() == channel) {
                 Integer opaque = entry.getKey();
                 if (opaque != null) {
@@ -540,13 +527,10 @@ public abstract class NettyRemotingAbstract {
         if (acquired) {
             final SemaphoreReleaseOnlyOnce once = new 
SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
             try {
-                channel.writeAndFlush(request).addListener(new 
ChannelFutureListener() {
-                    @Override
-                    public void operationComplete(ChannelFuture f) throws 
Exception {
-                        once.release();
-                        if (!f.isSuccess()) {
-                            log.warn("send a request command to channel <" + 
channel.remoteAddress() + "> failed.");
-                        }
+                
channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {
+                    once.release();
+                    if (!f.isSuccess()) {
+                        log.warn("send a request command to channel <" + 
channel.remoteAddress() + "> failed.");
                     }
                 });
             } catch (Exception e) {
@@ -571,11 +555,11 @@ public abstract class NettyRemotingAbstract {
     }
 
     class NettyEventExecutor extends ServiceThread {
-        private final LinkedBlockingQueue<NettyEvent> eventQueue = new 
LinkedBlockingQueue<NettyEvent>();
-        private final int maxSize = 10000;
+        private final LinkedBlockingQueue<NettyEvent> eventQueue = new 
LinkedBlockingQueue<>();
 
         public void putNettyEvent(final NettyEvent event) {
             int currentSize = this.eventQueue.size();
+            int maxSize = 10000;
             if (currentSize <= maxSize) {
                 this.eventQueue.add(event);
             } else {
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index c3deb8a48..06bbae120 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -39,6 +39,7 @@ import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.concurrent.DefaultEventExecutorGroup;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.cert.CertificateException;
@@ -52,6 +53,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.ChannelEventListener;
@@ -66,6 +68,7 @@ import 
org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
+@SuppressWarnings("NullableProblems")
 public class NettyRemotingServer extends NettyRemotingAbstract implements 
RemotingServer {
     private static final InternalLogger log = 
InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
     private final ServerBootstrap serverBootstrap;
@@ -83,7 +86,7 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
      * NettyRemotingServer may hold multiple SubRemotingServer, each server 
will be stored in this container with a
      * ListenPort key.
      */
-    private ConcurrentMap<Integer/*Port*/, NettyRemotingAbstract> 
remotingServerTable = new ConcurrentHashMap<Integer, NettyRemotingAbstract>();
+    private final ConcurrentMap<Integer/*Port*/, NettyRemotingAbstract> 
remotingServerTable = new ConcurrentHashMap<>();
 
     private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
     private static final String TLS_HANDLER_NAME = "sslHandler";
@@ -99,68 +102,81 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
         this(nettyServerConfig, null);
     }
 
-    public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
-        final ChannelEventListener channelEventListener) {
+    public NettyRemotingServer(final NettyServerConfig nettyServerConfig, 
final ChannelEventListener channelEventListener) {
         super(nettyServerConfig.getServerOnewaySemaphoreValue(), 
nettyServerConfig.getServerAsyncSemaphoreValue());
         this.serverBootstrap = new ServerBootstrap();
         this.nettyServerConfig = nettyServerConfig;
         this.channelEventListener = channelEventListener;
 
-        int publicThreadNums = 
nettyServerConfig.getServerCallbackExecutorThreads();
-        if (publicThreadNums <= 0) {
-            publicThreadNums = 4;
-        }
+        this.publicExecutor = buildPublicExecutor(nettyServerConfig);
 
-        this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, 
new ThreadFactory() {
-            private AtomicInteger threadIndex = new AtomicInteger(0);
+        this.eventLoopGroupBoss = buildBossEventLoopGroup();
 
-            @Override
-            public Thread newThread(Runnable r) {
-                return new Thread(r, "NettyServerPublicExecutor_" + 
this.threadIndex.incrementAndGet());
-            }
-        });
+        this.eventLoopGroupSelector = buildEventLoopGroupSelector();
 
+        loadSslContext();
+    }
+
+    private EventLoopGroup buildEventLoopGroupSelector() {
         if (useEpoll()) {
-            this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new 
ThreadFactory() {
-                private AtomicInteger threadIndex = new AtomicInteger(0);
+            return new 
EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new 
ThreadFactory() {
+                private final AtomicInteger threadIndex = new AtomicInteger(0);
+                private final int threadTotal = 
nettyServerConfig.getServerSelectorThreads();
 
                 @Override
                 public Thread newThread(Runnable r) {
-                    return new Thread(r, String.format("NettyEPOLLBoss_%d", 
this.threadIndex.incrementAndGet()));
+                    return new Thread(r, 
String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, 
this.threadIndex.incrementAndGet()));
                 }
             });
-
-            this.eventLoopGroupSelector = new 
EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new 
ThreadFactory() {
-                private AtomicInteger threadIndex = new AtomicInteger(0);
-                private int threadTotal = 
nettyServerConfig.getServerSelectorThreads();
+        } else {
+            return new 
NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new 
ThreadFactory() {
+                private final AtomicInteger threadIndex = new AtomicInteger(0);
+                private final int threadTotal = 
nettyServerConfig.getServerSelectorThreads();
 
                 @Override
                 public Thread newThread(Runnable r) {
-                    return new Thread(r, 
String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, 
this.threadIndex.incrementAndGet()));
+                    return new Thread(r, 
String.format("NettyServerNIOSelector_%d_%d", threadTotal, 
this.threadIndex.incrementAndGet()));
                 }
             });
-        } else {
-            this.eventLoopGroupBoss = new NioEventLoopGroup(1, new 
ThreadFactory() {
-                private AtomicInteger threadIndex = new AtomicInteger(0);
+        }
+    }
+
+    private EventLoopGroup buildBossEventLoopGroup() {
+        if (useEpoll()) {
+            return new EpollEventLoopGroup(1, new ThreadFactory() {
+                private final AtomicInteger threadIndex = new AtomicInteger(0);
 
                 @Override
                 public Thread newThread(Runnable r) {
-                    return new Thread(r, String.format("NettyNIOBoss_%d", 
this.threadIndex.incrementAndGet()));
+                    return new Thread(r, String.format("NettyEPOLLBoss_%d", 
this.threadIndex.incrementAndGet()));
                 }
             });
-
-            this.eventLoopGroupSelector = new 
NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new 
ThreadFactory() {
-                private AtomicInteger threadIndex = new AtomicInteger(0);
-                private int threadTotal = 
nettyServerConfig.getServerSelectorThreads();
+        } else {
+            return new NioEventLoopGroup(1, new ThreadFactory() {
+                private final AtomicInteger threadIndex = new AtomicInteger(0);
 
                 @Override
                 public Thread newThread(Runnable r) {
-                    return new Thread(r, 
String.format("NettyServerNIOSelector_%d_%d", threadTotal, 
this.threadIndex.incrementAndGet()));
+                    return new Thread(r, String.format("NettyNIOBoss_%d", 
this.threadIndex.incrementAndGet()));
                 }
             });
         }
+    }
 
-        loadSslContext();
+    private ExecutorService buildPublicExecutor(NettyServerConfig 
nettyServerConfig) {
+        int publicThreadNums = 
nettyServerConfig.getServerCallbackExecutorThreads();
+        if (publicThreadNums <= 0) {
+            publicThreadNums = 4;
+        }
+
+        return Executors.newFixedThreadPool(publicThreadNums, new 
ThreadFactory() {
+            private final AtomicInteger threadIndex = new AtomicInteger(0);
+
+            @Override
+            public Thread newThread(Runnable r) {
+                return new Thread(r, "NettyServerPublicExecutor_" + 
this.threadIndex.incrementAndGet());
+            }
+        });
     }
 
     public void loadSslContext() {
@@ -171,9 +187,7 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
             try {
                 sslContext = TlsHelper.buildSslContext(false);
                 log.info("SSLContext created for server");
-            } catch (CertificateException e) {
-                log.error("Failed to create SSLContext for server", e);
-            } catch (IOException e) {
+            } catch (CertificateException | IOException e) {
                 log.error("Failed to create SSLContext for server", e);
             }
         }
@@ -181,28 +195,27 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
 
     private boolean useEpoll() {
         return RemotingUtil.isLinuxPlatform()
-            && nettyServerConfig.isUseEpollNativeSelector()
-            && Epoll.isAvailable();
+                && nettyServerConfig.isUseEpollNativeSelector()
+                && Epoll.isAvailable();
     }
 
     @Override
     public void start() {
         this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
-            nettyServerConfig.getServerWorkerThreads(),
-            new ThreadFactory() {
+                nettyServerConfig.getServerWorkerThreads(),
+                new ThreadFactory() {
 
-                private AtomicInteger threadIndex = new AtomicInteger(0);
+                    private final AtomicInteger threadIndex = new 
AtomicInteger(0);
 
-                @Override
-                public Thread newThread(Runnable r) {
-                    return new Thread(r, "NettyServerCodecThread_" + 
this.threadIndex.incrementAndGet());
-                }
-            });
+                    @Override
+                    public Thread newThread(Runnable r) {
+                        return new Thread(r, "NettyServerCodecThread_" + 
this.threadIndex.incrementAndGet());
+                    }
+                });
 
         prepareSharableHandlers();
 
-        ServerBootstrap childHandler =
-            this.serverBootstrap.group(this.eventLoopGroupBoss, 
this.eventLoopGroupSelector)
+        serverBootstrap.group(this.eventLoopGroupBoss, 
this.eventLoopGroupSelector)
                 .channel(useEpoll() ? EpollServerSocketChannel.class : 
NioServerSocketChannel.class)
                 .option(ChannelOption.SO_BACKLOG, 1024)
                 .option(ChannelOption.SO_REUSEADDR, true)
@@ -211,39 +224,23 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
                 .localAddress(new 
InetSocketAddress(this.nettyServerConfig.getListenPort()))
                 .childHandler(new ChannelInitializer<SocketChannel>() {
                     @Override
-                    public void initChannel(SocketChannel ch) throws Exception 
{
+                    public void initChannel(SocketChannel ch) {
                         ch.pipeline()
-                            .addLast(defaultEventExecutorGroup, 
HANDSHAKE_HANDLER_NAME, handshakeHandler)
-                            .addLast(defaultEventExecutorGroup,
-                                encoder,
-                                new NettyDecoder(),
-                                new IdleStateHandler(0, 0, 
nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
-                                connectionManageHandler,
-                                serverHandler
-                            );
+                                .addLast(defaultEventExecutorGroup, 
HANDSHAKE_HANDLER_NAME, handshakeHandler)
+                                .addLast(defaultEventExecutorGroup,
+                                        encoder,
+                                        new NettyDecoder(),
+                                        new IdleStateHandler(0, 0, 
nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
+                                        connectionManageHandler,
+                                        serverHandler
+                                );
                     }
                 });
-        if (nettyServerConfig.getServerSocketSndBufSize() > 0) {
-            log.info("server set SO_SNDBUF to {}", 
nettyServerConfig.getServerSocketSndBufSize());
-            childHandler.childOption(ChannelOption.SO_SNDBUF, 
nettyServerConfig.getServerSocketSndBufSize());
-        }
-        if (nettyServerConfig.getServerSocketRcvBufSize() > 0) {
-            log.info("server set SO_RCVBUF to {}", 
nettyServerConfig.getServerSocketRcvBufSize());
-            childHandler.childOption(ChannelOption.SO_RCVBUF, 
nettyServerConfig.getServerSocketRcvBufSize());
-        }
-        if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 && 
nettyServerConfig.getWriteBufferHighWaterMark() > 0) {
-            log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",
-                    nettyServerConfig.getWriteBufferLowWaterMark(), 
nettyServerConfig.getWriteBufferHighWaterMark());
-            childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, 
new WriteBufferWaterMark(
-                    nettyServerConfig.getWriteBufferLowWaterMark(), 
nettyServerConfig.getWriteBufferHighWaterMark()));
-        }
 
-        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
-            childHandler.childOption(ChannelOption.ALLOCATOR, 
PooledByteBufAllocator.DEFAULT);
-        }
+        addCustomConfig(serverBootstrap);
 
         try {
-            ChannelFuture sync = 
this.serverBootstrap.bind(nettyServerConfig.getListenPort()).sync();
+            ChannelFuture sync = 
serverBootstrap.bind(nettyServerConfig.getListenPort()).sync();
             InetSocketAddress addr = (InetSocketAddress) 
sync.channel().localAddress();
             if (0 == nettyServerConfig.getListenPort()) {
                 this.nettyServerConfig.setListenPort(addr.getPort());
@@ -271,20 +268,37 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
         }, 1000 * 3, 1000);
     }
 
+    private void addCustomConfig(ServerBootstrap childHandler) {
+        if (nettyServerConfig.getServerSocketSndBufSize() > 0) {
+            log.info("server set SO_SNDBUF to {}", 
nettyServerConfig.getServerSocketSndBufSize());
+            childHandler.childOption(ChannelOption.SO_SNDBUF, 
nettyServerConfig.getServerSocketSndBufSize());
+        }
+        if (nettyServerConfig.getServerSocketRcvBufSize() > 0) {
+            log.info("server set SO_RCVBUF to {}", 
nettyServerConfig.getServerSocketRcvBufSize());
+            childHandler.childOption(ChannelOption.SO_RCVBUF, 
nettyServerConfig.getServerSocketRcvBufSize());
+        }
+        if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 && 
nettyServerConfig.getWriteBufferHighWaterMark() > 0) {
+            log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",
+                    nettyServerConfig.getWriteBufferLowWaterMark(), 
nettyServerConfig.getWriteBufferHighWaterMark());
+            childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, 
new WriteBufferWaterMark(
+                    nettyServerConfig.getWriteBufferLowWaterMark(), 
nettyServerConfig.getWriteBufferHighWaterMark()));
+        }
+
+        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
+            childHandler.childOption(ChannelOption.ALLOCATOR, 
PooledByteBufAllocator.DEFAULT);
+        }
+    }
+
     @Override
     public void shutdown() {
         try {
-            if (this.timer != null) {
-                this.timer.cancel();
-            }
+            this.timer.cancel();
 
             this.eventLoopGroupBoss.shutdownGracefully();
 
             this.eventLoopGroupSelector.shutdownGracefully();
 
-            if (this.nettyEventExecutor != null) {
-                this.nettyEventExecutor.shutdown();
-            }
+            this.nettyEventExecutor.shutdown();
 
             if (this.defaultEventExecutorGroup != null) {
                 this.defaultEventExecutorGroup.shutdownGracefully();
@@ -309,13 +323,13 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
             executorThis = this.publicExecutor;
         }
 
-        Pair<NettyRequestProcessor, ExecutorService> pair = new 
Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
+        Pair<NettyRequestProcessor, ExecutorService> pair = new 
Pair<>(processor, executorThis);
         this.processorTable.put(requestCode, pair);
     }
 
     @Override
     public void registerDefaultProcessor(NettyRequestProcessor processor, 
ExecutorService executor) {
-        this.defaultRequestProcessor = new Pair<NettyRequestProcessor, 
ExecutorService>(processor, executor);
+        this.defaultRequestProcessorPair = new Pair<>(processor, executor);
     }
 
     @Override
@@ -330,14 +344,14 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
 
     @Override
     public Pair<NettyRequestProcessor, ExecutorService> 
getDefaultProcessorPair() {
-        return defaultRequestProcessor;
+        return defaultRequestProcessorPair;
     }
 
     @Override
     public RemotingServer newRemotingServer(final int port) {
         SubRemotingServer remotingServer = new SubRemotingServer(port,
-            this.nettyServerConfig.getServerOnewaySemaphoreValue(),
-            this.nettyServerConfig.getServerAsyncSemaphoreValue());
+                this.nettyServerConfig.getServerOnewaySemaphoreValue(),
+                this.nettyServerConfig.getServerAsyncSemaphoreValue());
         NettyRemotingAbstract existingServer = 
this.remotingServerTable.putIfAbsent(port, remotingServer);
         if (existingServer != null) {
             throw new RuntimeException("The port " + port + " already in use 
by another RemotingServer");
@@ -352,19 +366,19 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
 
     @Override
     public RemotingCommand invokeSync(final Channel channel, final 
RemotingCommand request, final long timeoutMillis)
-        throws InterruptedException, RemotingSendRequestException, 
RemotingTimeoutException {
+            throws InterruptedException, RemotingSendRequestException, 
RemotingTimeoutException {
         return this.invokeSyncImpl(channel, request, timeoutMillis);
     }
 
     @Override
     public void invokeAsync(Channel channel, RemotingCommand request, long 
timeoutMillis, InvokeCallback invokeCallback)
-        throws InterruptedException, RemotingTooMuchRequestException, 
RemotingTimeoutException, RemotingSendRequestException {
+            throws InterruptedException, RemotingTooMuchRequestException, 
RemotingTimeoutException, RemotingSendRequestException {
         this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
     }
 
     @Override
     public void invokeOneway(Channel channel, RemotingCommand request, long 
timeoutMillis) throws InterruptedException,
-        RemotingTooMuchRequestException, RemotingTimeoutException, 
RemotingSendRequestException {
+            RemotingTooMuchRequestException, RemotingTimeoutException, 
RemotingSendRequestException {
         this.invokeOnewayImpl(channel, request, timeoutMillis);
     }
 
@@ -397,7 +411,7 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
         }
 
         @Override
-        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) 
throws Exception {
+        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
 
             // mark the current position so that we can peek the first byte to 
determine if the content is starting with
             // TLS handshake
@@ -415,8 +429,8 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
                     case ENFORCING:
                         if (null != sslContext) {
                             ctx.pipeline()
-                                .addAfter(defaultEventExecutorGroup, 
HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, 
sslContext.newHandler(ctx.channel().alloc()))
-                                .addAfter(defaultEventExecutorGroup, 
TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
+                                    .addAfter(defaultEventExecutorGroup, 
HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, 
sslContext.newHandler(ctx.channel().alloc()))
+                                    .addAfter(defaultEventExecutorGroup, 
TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
                             log.info("Handlers prepended to channel pipeline 
to establish SSL connection");
                         } else {
                             ctx.close();
@@ -503,7 +517,7 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
         }
 
         @Override
-        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) 
throws Exception {
+        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
             if (evt instanceof IdleStateEvent) {
                 IdleStateEvent event = (IdleStateEvent) evt;
                 if (event.state().equals(IdleState.ALL_IDLE)) {
@@ -512,7 +526,7 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
                     RemotingUtil.closeChannel(ctx.channel());
                     if (NettyRemotingServer.this.channelEventListener != null) 
{
                         NettyRemotingServer.this
-                            .putNettyEvent(new NettyEvent(NettyEventType.IDLE, 
remoteAddress, ctx.channel()));
+                                .putNettyEvent(new 
NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
                     }
                 }
             }
@@ -521,7 +535,7 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
         }
 
         @Override
-        public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) throws Exception {
+        public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) {
             final String remoteAddress = 
RemotingHelper.parseChannelRemoteAddr(ctx.channel());
             log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", 
remoteAddress);
             log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", 
cause);
@@ -550,19 +564,19 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
 
         @Override
         public void registerProcessor(final int requestCode, final 
NettyRequestProcessor processor,
-            final ExecutorService executor) {
+                                      final ExecutorService executor) {
             ExecutorService executorThis = executor;
             if (null == executor) {
                 executorThis = NettyRemotingServer.this.publicExecutor;
             }
 
-            Pair<NettyRequestProcessor, ExecutorService> pair = new 
Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
+            Pair<NettyRequestProcessor, ExecutorService> pair = new 
Pair<>(processor, executorThis);
             this.processorTable.put(requestCode, pair);
         }
 
         @Override
         public void registerDefaultProcessor(final NettyRequestProcessor 
processor, final ExecutorService executor) {
-            this.defaultRequestProcessor = new Pair<NettyRequestProcessor, 
ExecutorService>(processor, executor);
+            this.defaultRequestProcessorPair = new Pair<>(processor, executor);
         }
 
         @Override
@@ -577,36 +591,36 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
 
         @Override
         public Pair<NettyRequestProcessor, ExecutorService> 
getDefaultProcessorPair() {
-            return this.defaultRequestProcessor;
+            return this.defaultRequestProcessorPair;
         }
 
         @Override
         public RemotingServer newRemotingServer(final int port) {
             throw new UnsupportedOperationException("The SubRemotingServer of 
NettyRemotingServer " +
-                "doesn't support new nested RemotingServer");
+                    "doesn't support new nested RemotingServer");
         }
 
         @Override
         public void removeRemotingServer(final int port) {
             throw new UnsupportedOperationException("The SubRemotingServer of 
NettyRemotingServer " +
-                "doesn't support remove nested RemotingServer");
+                    "doesn't support remove nested RemotingServer");
         }
 
         @Override
         public RemotingCommand invokeSync(final Channel channel, final 
RemotingCommand request,
-            final long timeoutMillis) throws InterruptedException, 
RemotingSendRequestException, RemotingTimeoutException {
+                                          final long timeoutMillis) throws 
InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
             return this.invokeSyncImpl(channel, request, timeoutMillis);
         }
 
         @Override
         public void invokeAsync(final Channel channel, final RemotingCommand 
request, final long timeoutMillis,
-            final InvokeCallback invokeCallback) throws InterruptedException, 
RemotingTooMuchRequestException, RemotingTimeoutException, 
RemotingSendRequestException {
+                                final InvokeCallback invokeCallback) throws 
InterruptedException, RemotingTooMuchRequestException, 
RemotingTimeoutException, RemotingSendRequestException {
             this.invokeAsyncImpl(channel, request, timeoutMillis, 
invokeCallback);
         }
 
         @Override
         public void invokeOneway(final Channel channel, final RemotingCommand 
request,
-            final long timeoutMillis) throws InterruptedException, 
RemotingTooMuchRequestException, RemotingTimeoutException, 
RemotingSendRequestException {
+                                 final long timeoutMillis) throws 
InterruptedException, RemotingTooMuchRequestException, 
RemotingTimeoutException, RemotingSendRequestException {
             this.invokeOnewayImpl(channel, request, timeoutMillis);
         }
 

Reply via email to