This is an automated email from the ASF dual-hosted git repository.

yuzhou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new c5db5ac3e [ISSUE #4125] Anonymous new Runnable() can be replaced with 
lambda (#4031)
c5db5ac3e is described below

commit c5db5ac3e41bd9b922e507fba438dd89e223b2d9
Author: 李晓双 Li Xiao Shuang <[email protected]>
AuthorDate: Wed Apr 6 22:02:00 2022 +0800

    [ISSUE #4125] Anonymous new Runnable() can be replaced with lambda (#4031)
---
 .../apache/rocketmq/broker/BrokerController.java   | 124 ++++++++-------------
 1 file changed, 49 insertions(+), 75 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 7bfc618b9..772eec6dc 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -202,15 +202,15 @@ public class BrokerController {
 
         this.slaveSynchronize = new SlaveSynchronize(this);
 
-        this.sendThreadPoolQueue = new 
LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
-        this.putThreadPoolQueue = new 
LinkedBlockingQueue<Runnable>(this.brokerConfig.getPutThreadPoolQueueCapacity());
-        this.pullThreadPoolQueue = new 
LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
-        this.replyThreadPoolQueue = new 
LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());
-        this.queryThreadPoolQueue = new 
LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
-        this.clientManagerThreadPoolQueue = new 
LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
-        this.consumerManagerThreadPoolQueue = new 
LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
-        this.heartbeatThreadPoolQueue = new 
LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
-        this.endTransactionThreadPoolQueue = new 
LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
+        this.sendThreadPoolQueue = new 
LinkedBlockingQueue<>(this.brokerConfig.getSendThreadPoolQueueCapacity());
+        this.putThreadPoolQueue = new 
LinkedBlockingQueue<>(this.brokerConfig.getPutThreadPoolQueueCapacity());
+        this.pullThreadPoolQueue = new 
LinkedBlockingQueue<>(this.brokerConfig.getPullThreadPoolQueueCapacity());
+        this.replyThreadPoolQueue = new 
LinkedBlockingQueue<>(this.brokerConfig.getReplyThreadPoolQueueCapacity());
+        this.queryThreadPoolQueue = new 
LinkedBlockingQueue<>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
+        this.clientManagerThreadPoolQueue = new 
LinkedBlockingQueue<>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
+        this.consumerManagerThreadPoolQueue = new 
LinkedBlockingQueue<>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
+        this.heartbeatThreadPoolQueue = new 
LinkedBlockingQueue<>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
+        this.endTransactionThreadPoolQueue = new 
LinkedBlockingQueue<>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
 
         this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new 
LmqBrokerStatsManager(this.brokerConfig.getBrokerClusterName(), 
this.brokerConfig.isEnableDetailStat()) : new 
BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), 
this.brokerConfig.isEnableDetailStat());
 
@@ -350,70 +350,51 @@ public class BrokerController {
 
             final long initialDelay = UtilAll.computeNextMorningTimeMillis() - 
System.currentTimeMillis();
             final long period = 1000 * 60 * 60 * 24;
-            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        BrokerController.this.getBrokerStats().record();
-                    } catch (Throwable e) {
-                        log.error("schedule record error.", e);
-                    }
+            this.scheduledExecutorService.scheduleAtFixedRate(() -> {
+                try {
+                    BrokerController.this.getBrokerStats().record();
+                } catch (Throwable e) {
+                    log.error("schedule record error.", e);
                 }
             }, initialDelay, period, TimeUnit.MILLISECONDS);
 
-            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        BrokerController.this.consumerOffsetManager.persist();
-                    } catch (Throwable e) {
-                        log.error("schedule persist consumerOffset error.", e);
-                    }
+            this.scheduledExecutorService.scheduleAtFixedRate(() -> {
+                try {
+                    BrokerController.this.consumerOffsetManager.persist();
+                } catch (Throwable e) {
+                    log.error("schedule persist consumerOffset error.", e);
                 }
             }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), 
TimeUnit.MILLISECONDS);
 
-            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        BrokerController.this.consumerFilterManager.persist();
-                    } catch (Throwable e) {
-                        log.error("schedule persist consumer filter error.", 
e);
-                    }
+            this.scheduledExecutorService.scheduleAtFixedRate(() -> {
+                try {
+                    BrokerController.this.consumerFilterManager.persist();
+                } catch (Throwable e) {
+                    log.error("schedule persist consumer filter error.", e);
                 }
             }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
 
-            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        BrokerController.this.protectBroker();
-                    } catch (Throwable e) {
-                        log.error("protectBroker error.", e);
-                    }
+            this.scheduledExecutorService.scheduleAtFixedRate(() -> {
+                try {
+                    BrokerController.this.protectBroker();
+                } catch (Throwable e) {
+                    log.error("protectBroker error.", e);
                 }
             }, 3, 3, TimeUnit.MINUTES);
 
-            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        BrokerController.this.printWaterMark();
-                    } catch (Throwable e) {
-                        log.error("printWaterMark error.", e);
-                    }
+            this.scheduledExecutorService.scheduleAtFixedRate(() -> {
+                try {
+                    BrokerController.this.printWaterMark();
+                } catch (Throwable e) {
+                    log.error("printWaterMark error.", e);
                 }
             }, 10, 1, TimeUnit.SECONDS);
 
-            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-
-                @Override
-                public void run() {
-                    try {
-                        log.info("dispatch behind commit log {} bytes", 
BrokerController.this.getMessageStore().dispatchBehindBytes());
-                    } catch (Throwable e) {
-                        log.error("schedule dispatchBehindBytes error.", e);
-                    }
+            this.scheduledExecutorService.scheduleAtFixedRate(() -> {
+                try {
+                    log.info("dispatch behind commit log {} bytes", 
BrokerController.this.getMessageStore().dispatchBehindBytes());
+                } catch (Throwable e) {
+                    log.error("schedule dispatchBehindBytes error.", e);
                 }
             }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
 
@@ -421,15 +402,11 @@ public class BrokerController {
                 
this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
                 log.info("Set user specified name server address: {}", 
this.brokerConfig.getNamesrvAddr());
             } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
-                this.scheduledExecutorService.scheduleAtFixedRate(new 
Runnable() {
-
-                    @Override
-                    public void run() {
-                        try {
-                            
BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
-                        } catch (Throwable e) {
-                            log.error("ScheduledTask fetchNameServerAddr 
exception", e);
-                        }
+                this.scheduledExecutorService.scheduleAtFixedRate(() -> {
+                    try {
+                        
BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
+                    } catch (Throwable e) {
+                        log.error("ScheduledTask fetchNameServerAddr 
exception", e);
                     }
                 }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
             }
@@ -443,14 +420,11 @@ public class BrokerController {
                         this.updateMasterHAServerAddrPeriodically = true;
                     }
                 } else {
-                    this.scheduledExecutorService.scheduleAtFixedRate(new 
Runnable() {
-                        @Override
-                        public void run() {
-                            try {
-                                
BrokerController.this.printMasterAndSlaveDiff();
-                            } catch (Throwable e) {
-                                log.error("schedule printMasterAndSlaveDiff 
error.", e);
-                            }
+                    this.scheduledExecutorService.scheduleAtFixedRate(() -> {
+                        try {
+                            BrokerController.this.printMasterAndSlaveDiff();
+                        } catch (Throwable e) {
+                            log.error("schedule printMasterAndSlaveDiff 
error.", e);
                         }
                     }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
                 }

Reply via email to