This is an automated email from the ASF dual-hosted git repository.
yuzhou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new c5db5ac3e [ISSUE #4125] Anonymous new Runnable() can be replaced with
lambda (#4031)
c5db5ac3e is described below
commit c5db5ac3e41bd9b922e507fba438dd89e223b2d9
Author: 李晓双 Li Xiao Shuang <[email protected]>
AuthorDate: Wed Apr 6 22:02:00 2022 +0800
[ISSUE #4125] Anonymous new Runnable() can be replaced with lambda (#4031)
---
.../apache/rocketmq/broker/BrokerController.java | 124 ++++++++-------------
1 file changed, 49 insertions(+), 75 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 7bfc618b9..772eec6dc 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -202,15 +202,15 @@ public class BrokerController {
this.slaveSynchronize = new SlaveSynchronize(this);
- this.sendThreadPoolQueue = new
LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
- this.putThreadPoolQueue = new
LinkedBlockingQueue<Runnable>(this.brokerConfig.getPutThreadPoolQueueCapacity());
- this.pullThreadPoolQueue = new
LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
- this.replyThreadPoolQueue = new
LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());
- this.queryThreadPoolQueue = new
LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
- this.clientManagerThreadPoolQueue = new
LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
- this.consumerManagerThreadPoolQueue = new
LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
- this.heartbeatThreadPoolQueue = new
LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
- this.endTransactionThreadPoolQueue = new
LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
+ this.sendThreadPoolQueue = new
LinkedBlockingQueue<>(this.brokerConfig.getSendThreadPoolQueueCapacity());
+ this.putThreadPoolQueue = new
LinkedBlockingQueue<>(this.brokerConfig.getPutThreadPoolQueueCapacity());
+ this.pullThreadPoolQueue = new
LinkedBlockingQueue<>(this.brokerConfig.getPullThreadPoolQueueCapacity());
+ this.replyThreadPoolQueue = new
LinkedBlockingQueue<>(this.brokerConfig.getReplyThreadPoolQueueCapacity());
+ this.queryThreadPoolQueue = new
LinkedBlockingQueue<>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
+ this.clientManagerThreadPoolQueue = new
LinkedBlockingQueue<>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
+ this.consumerManagerThreadPoolQueue = new
LinkedBlockingQueue<>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
+ this.heartbeatThreadPoolQueue = new
LinkedBlockingQueue<>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
+ this.endTransactionThreadPoolQueue = new
LinkedBlockingQueue<>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new
LmqBrokerStatsManager(this.brokerConfig.getBrokerClusterName(),
this.brokerConfig.isEnableDetailStat()) : new
BrokerStatsManager(this.brokerConfig.getBrokerClusterName(),
this.brokerConfig.isEnableDetailStat());
@@ -350,70 +350,51 @@ public class BrokerController {
final long initialDelay = UtilAll.computeNextMorningTimeMillis() -
System.currentTimeMillis();
final long period = 1000 * 60 * 60 * 24;
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- BrokerController.this.getBrokerStats().record();
- } catch (Throwable e) {
- log.error("schedule record error.", e);
- }
+ this.scheduledExecutorService.scheduleAtFixedRate(() -> {
+ try {
+ BrokerController.this.getBrokerStats().record();
+ } catch (Throwable e) {
+ log.error("schedule record error.", e);
}
}, initialDelay, period, TimeUnit.MILLISECONDS);
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- BrokerController.this.consumerOffsetManager.persist();
- } catch (Throwable e) {
- log.error("schedule persist consumerOffset error.", e);
- }
+ this.scheduledExecutorService.scheduleAtFixedRate(() -> {
+ try {
+ BrokerController.this.consumerOffsetManager.persist();
+ } catch (Throwable e) {
+ log.error("schedule persist consumerOffset error.", e);
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(),
TimeUnit.MILLISECONDS);
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- BrokerController.this.consumerFilterManager.persist();
- } catch (Throwable e) {
- log.error("schedule persist consumer filter error.",
e);
- }
+ this.scheduledExecutorService.scheduleAtFixedRate(() -> {
+ try {
+ BrokerController.this.consumerFilterManager.persist();
+ } catch (Throwable e) {
+ log.error("schedule persist consumer filter error.", e);
}
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- BrokerController.this.protectBroker();
- } catch (Throwable e) {
- log.error("protectBroker error.", e);
- }
+ this.scheduledExecutorService.scheduleAtFixedRate(() -> {
+ try {
+ BrokerController.this.protectBroker();
+ } catch (Throwable e) {
+ log.error("protectBroker error.", e);
}
}, 3, 3, TimeUnit.MINUTES);
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- BrokerController.this.printWaterMark();
- } catch (Throwable e) {
- log.error("printWaterMark error.", e);
- }
+ this.scheduledExecutorService.scheduleAtFixedRate(() -> {
+ try {
+ BrokerController.this.printWaterMark();
+ } catch (Throwable e) {
+ log.error("printWaterMark error.", e);
}
}, 10, 1, TimeUnit.SECONDS);
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-
- @Override
- public void run() {
- try {
- log.info("dispatch behind commit log {} bytes",
BrokerController.this.getMessageStore().dispatchBehindBytes());
- } catch (Throwable e) {
- log.error("schedule dispatchBehindBytes error.", e);
- }
+ this.scheduledExecutorService.scheduleAtFixedRate(() -> {
+ try {
+ log.info("dispatch behind commit log {} bytes",
BrokerController.this.getMessageStore().dispatchBehindBytes());
+ } catch (Throwable e) {
+ log.error("schedule dispatchBehindBytes error.", e);
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
@@ -421,15 +402,11 @@ public class BrokerController {
this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
log.info("Set user specified name server address: {}",
this.brokerConfig.getNamesrvAddr());
} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
- this.scheduledExecutorService.scheduleAtFixedRate(new
Runnable() {
-
- @Override
- public void run() {
- try {
-
BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
- } catch (Throwable e) {
- log.error("ScheduledTask fetchNameServerAddr
exception", e);
- }
+ this.scheduledExecutorService.scheduleAtFixedRate(() -> {
+ try {
+
BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
+ } catch (Throwable e) {
+ log.error("ScheduledTask fetchNameServerAddr
exception", e);
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
@@ -443,14 +420,11 @@ public class BrokerController {
this.updateMasterHAServerAddrPeriodically = true;
}
} else {
- this.scheduledExecutorService.scheduleAtFixedRate(new
Runnable() {
- @Override
- public void run() {
- try {
-
BrokerController.this.printMasterAndSlaveDiff();
- } catch (Throwable e) {
- log.error("schedule printMasterAndSlaveDiff
error.", e);
- }
+ this.scheduledExecutorService.scheduleAtFixedRate(() -> {
+ try {
+ BrokerController.this.printMasterAndSlaveDiff();
+ } catch (Throwable e) {
+ log.error("schedule printMasterAndSlaveDiff
error.", e);
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}