This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 89e021e871 [ISSUE #9898] Remove AbstractBrokerRunnable and replace
with Runnable
89e021e871 is described below
commit 89e021e871b843b52a1aecc2dcd3adad61089ef6
Author: rongtong <[email protected]>
AuthorDate: Fri Dec 5 14:03:54 2025 +0800
[ISSUE #9898] Remove AbstractBrokerRunnable and replace with Runnable
Change-Id: I94104151c452d09cbe195a3e1126a473b662a337
Co-authored-by: RongtongJin <[email protected]>
---
.../apache/rocketmq/broker/BrokerController.java | 13 +++---
.../client/DefaultConsumerIdsChangeListener.java | 5 +--
.../rocketmq/broker/latency/BrokerFastFailure.java | 5 +--
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 35 +++++++---------
.../rocketmq/common/AbstractBrokerRunnable.java | 48 ----------------------
.../apache/rocketmq/container/BrokerContainer.java | 13 +++---
.../rocketmq/container/InnerBrokerController.java | 9 ++--
.../apache/rocketmq/store/DefaultMessageStore.java | 17 ++++----
.../apache/rocketmq/store/index/IndexService.java | 5 +--
9 files changed, 45 insertions(+), 105 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index de1cbfc823..76882cc71c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -121,7 +121,6 @@ import
org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageC
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge;
import
org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl;
import org.apache.rocketmq.broker.util.HookUtils;
-import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.BrokerIdentity;
import org.apache.rocketmq.common.MixAll;
@@ -1814,9 +1813,9 @@ public class BrokerController {
this.registerBrokerAll(true, false, true);
}
-
scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new
AbstractBrokerRunnable(this.getBrokerIdentity()) {
+
scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new
Runnable() {
@Override
- public void run0() {
+ public void run() {
try {
if (System.currentTimeMillis() < shouldStartTime) {
BrokerController.LOG.info("Register to namesrv after
{}", shouldStartTime);
@@ -1836,9 +1835,9 @@ public class BrokerController {
if (this.brokerConfig.isEnableSlaveActingMaster()) {
scheduleSendHeartbeat();
-
scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new
AbstractBrokerRunnable(this.getBrokerIdentity()) {
+
scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new
Runnable() {
@Override
- public void run0() {
+ public void run() {
try {
BrokerController.this.syncBrokerMemberGroup();
} catch (Throwable e) {
@@ -1869,9 +1868,9 @@ public class BrokerController {
}
protected void scheduleSendHeartbeat() {
-
scheduledFutures.add(this.brokerHeartbeatExecutorService.scheduleAtFixedRate(new
AbstractBrokerRunnable(this.getBrokerIdentity()) {
+
scheduledFutures.add(this.brokerHeartbeatExecutorService.scheduleAtFixedRate(new
Runnable() {
@Override
- public void run0() {
+ public void run() {
if (isIsolated) {
return;
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
index e046176956..2946e03e1a 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
@@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -48,9 +47,9 @@ public class DefaultConsumerIdsChangeListener implements
ConsumerIdsChangeListen
public DefaultConsumerIdsChangeListener(BrokerController brokerController)
{
this.brokerController = brokerController;
- scheduledExecutorService.scheduleAtFixedRate(new
AbstractBrokerRunnable(brokerController.getBrokerConfig()) {
+ scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
- public void run0() {
+ public void run() {
try {
notifyConsumerChange();
} catch (Exception e) {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
index ce8fdd8857..31bdb838a2 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
@@ -23,7 +23,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
@@ -80,9 +79,9 @@ public class BrokerFastFailure {
}
public void start() {
- this.scheduledExecutorService.scheduleAtFixedRate(new
AbstractBrokerRunnable(this.brokerController.getBrokerConfig()) {
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
- public void run0() {
+ public void run() {
if
(brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {
cleanExpiredRequest();
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 21ba349c84..ba4ba2ccf9 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -46,7 +46,6 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
-import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.BrokerIdentity;
import org.apache.rocketmq.common.LockCallback;
import org.apache.rocketmq.common.MixAll;
@@ -356,18 +355,14 @@ public class BrokerOuterAPI {
requestHeader.setClusterName(clusterName);
for (final String namesrvAddr : nameServerAddressList) {
- brokerOuterExecutor.execute(new AbstractBrokerRunnable(new
BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) {
-
- @Override
- public void run0() {
- RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION,
requestHeader);
- request.setBody(dataVersion.encode());
+ brokerOuterExecutor.execute(() -> {
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION,
requestHeader);
+ request.setBody(dataVersion.encode());
- try {
-
BrokerOuterAPI.this.remotingClient.invokeOneway(namesrvAddr, request,
timeoutMillis);
- } catch (Exception e) {
- LOGGER.error("sendHeartbeat Exception " +
namesrvAddr, e);
- }
+ try {
+
BrokerOuterAPI.this.remotingClient.invokeOneway(namesrvAddr, request,
timeoutMillis);
+ } catch (Exception e) {
+ LOGGER.error("sendHeartbeat Exception " + namesrvAddr,
e);
}
});
}
@@ -389,9 +384,9 @@ public class BrokerOuterAPI {
if (nameServerAddressList != null && nameServerAddressList.size() > 0)
{
for (final String namesrvAddr : nameServerAddressList) {
- brokerOuterExecutor.execute(new AbstractBrokerRunnable(new
BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) {
+ brokerOuterExecutor.execute(new Runnable() {
@Override
- public void run0() {
+ public void run() {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.BROKER_HEARTBEAT,
requestHeader);
try {
@@ -532,9 +527,9 @@ public class BrokerOuterAPI {
requestHeader.setBodyCrc32(bodyCrc32);
final CountDownLatch countDownLatch = new
CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
- brokerOuterExecutor.execute(new
AbstractBrokerRunnable(brokerIdentity) {
+ brokerOuterExecutor.execute(new Runnable() {
@Override
- public void run0() {
+ public void run() {
try {
RegisterBrokerResult result =
registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
if (result != null) {
@@ -719,9 +714,9 @@ public class BrokerOuterAPI {
if (nameServerAddressList != null && nameServerAddressList.size() > 0)
{
final CountDownLatch countDownLatch = new
CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
- brokerOuterExecutor.execute(new AbstractBrokerRunnable(new
BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) {
+ brokerOuterExecutor.execute(new Runnable() {
@Override
- public void run0() {
+ public void run() {
try {
QueryDataVersionRequestHeader requestHeader = new
QueryDataVersionRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
@@ -1501,9 +1496,9 @@ public class BrokerOuterAPI {
requestHeader.setHeartbeatTimeoutMills(controllerHeartBeatTimeoutMills);
requestHeader.setElectionPriority(electionPriority);
requestHeader.setBrokerId(brokerId);
- brokerOuterExecutor.execute(new AbstractBrokerRunnable(new
BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) {
+ brokerOuterExecutor.execute(new Runnable() {
@Override
- public void run0() {
+ public void run() {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.BROKER_HEARTBEAT,
requestHeader);
try {
diff --git
a/common/src/main/java/org/apache/rocketmq/common/AbstractBrokerRunnable.java
b/common/src/main/java/org/apache/rocketmq/common/AbstractBrokerRunnable.java
deleted file mode 100644
index 34aabc5772..0000000000
---
a/common/src/main/java/org/apache/rocketmq/common/AbstractBrokerRunnable.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.common;
-
-import java.io.File;
-import org.apache.rocketmq.logging.org.slf4j.MDC;
-
-public abstract class AbstractBrokerRunnable implements Runnable {
- protected final BrokerIdentity brokerIdentity;
-
- public AbstractBrokerRunnable(BrokerIdentity brokerIdentity) {
- this.brokerIdentity = brokerIdentity;
- }
-
- private static final String MDC_BROKER_CONTAINER_LOG_DIR =
"brokerContainerLogDir";
-
- /**
- * real logic for running
- */
- public abstract void run0();
-
- @Override
- public void run() {
- try {
- if (brokerIdentity.isInBrokerContainer()) {
- MDC.put(MDC_BROKER_CONTAINER_LOG_DIR, File.separator +
brokerIdentity.getCanonicalName());
- }
- run0();
- } finally {
- MDC.clear();
- }
- }
-}
diff --git
a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
index aa38fb6224..e8debfe99b 100644
--- a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
+++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
@@ -22,7 +22,6 @@ import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.broker.ConfigContext;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
-import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.BrokerIdentity;
import org.apache.rocketmq.common.MixAll;
@@ -156,9 +155,9 @@ public class BrokerContainer implements IBrokerContainer {
this.updateNamesrvAddr();
LOG.info("Set user specified name server address: {}",
this.brokerContainerConfig.getNamesrvAddr());
// also auto update namesrv if specify
- this.scheduledExecutorService.scheduleAtFixedRate(new
AbstractBrokerRunnable(BrokerIdentity.BROKER_CONTAINER_IDENTITY) {
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
- public void run0() {
+ public void run() {
try {
BrokerContainer.this.updateNamesrvAddr();
} catch (Throwable e) {
@@ -167,10 +166,10 @@ public class BrokerContainer implements IBrokerContainer {
}
}, 1000 * 10,
this.brokerContainerConfig.getUpdateNamesrvAddrInterval(),
TimeUnit.MILLISECONDS);
} else if
(this.brokerContainerConfig.isFetchNamesrvAddrByAddressServer()) {
- this.scheduledExecutorService.scheduleAtFixedRate(new
AbstractBrokerRunnable(BrokerIdentity.BROKER_CONTAINER_IDENTITY) {
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
- public void run0() {
+ public void run() {
try {
BrokerContainer.this.brokerOuterAPI.fetchNameServerAddr();
} catch (Throwable e) {
@@ -180,9 +179,9 @@ public class BrokerContainer implements IBrokerContainer {
}, 1000 * 10,
this.brokerContainerConfig.getFetchNamesrvAddrInterval(),
TimeUnit.MILLISECONDS);
}
- this.scheduledExecutorService.scheduleAtFixedRate(new
AbstractBrokerRunnable(BrokerIdentity.BROKER_CONTAINER_IDENTITY) {
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
- public void run0() {
+ public void run() {
try {
BrokerContainer.this.brokerOuterAPI.refreshMetadata();
} catch (Exception e) {
diff --git
a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
index 41ce28214b..102bd4710f 100644
---
a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
+++
b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
@@ -21,7 +21,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.auth.config.AuthConfig;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
-import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.RemotingServer;
@@ -82,9 +81,9 @@ public class InnerBrokerController extends BrokerController {
this.registerBrokerAll(true, false, true);
}
-
scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new
AbstractBrokerRunnable(this.getBrokerIdentity()) {
+
scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new
Runnable() {
@Override
- public void run0() {
+ public void run() {
try {
if (System.currentTimeMillis() < shouldStartTime) {
BrokerController.LOG.info("Register to namesrv after
{}", shouldStartTime);
@@ -104,9 +103,9 @@ public class InnerBrokerController extends BrokerController
{
if (this.brokerConfig.isEnableSlaveActingMaster()) {
scheduleSendHeartbeat();
-
scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new
AbstractBrokerRunnable(this.getBrokerIdentity()) {
+
scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new
Runnable() {
@Override
- public void run0() {
+ public void run() {
try {
InnerBrokerController.this.syncBrokerMemberGroup();
} catch (Throwable e) {
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index cb5c41471a..d440ccfb11 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -57,7 +57,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.BrokerIdentity;
@@ -1775,23 +1774,23 @@ public class DefaultMessageStore implements
MessageStore {
private void addScheduleTask() {
- this.scheduledExecutorService.scheduleAtFixedRate(new
AbstractBrokerRunnable(this.getBrokerIdentity()) {
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
- public void run0() {
+ public void run() {
DefaultMessageStore.this.cleanFilesPeriodically();
}
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(),
TimeUnit.MILLISECONDS);
- this.scheduledExecutorService.scheduleAtFixedRate(new
AbstractBrokerRunnable(this.getBrokerIdentity()) {
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
- public void run0() {
+ public void run() {
DefaultMessageStore.this.checkSelf();
}
}, 1, 10, TimeUnit.MINUTES);
- this.scheduledExecutorService.scheduleAtFixedRate(new
AbstractBrokerRunnable(this.getBrokerIdentity()) {
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
- public void run0() {
+ public void run() {
if
(DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) {
try {
if
(DefaultMessageStore.this.commitLog.getBeginTimeInLock() != 0) {
@@ -1810,9 +1809,9 @@ public class DefaultMessageStore implements MessageStore {
}
}, 1, 1, TimeUnit.SECONDS);
- this.scheduledExecutorService.scheduleAtFixedRate(new
AbstractBrokerRunnable(this.getBrokerIdentity()) {
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
- public void run0() {
+ public void run() {
DefaultMessageStore.this.storeCheckpoint.flush();
}
}, 1, 1, TimeUnit.SECONDS);
diff --git
a/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java
b/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java
index 2d325ee13a..4d358b4ced 100644
--- a/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -339,9 +338,9 @@ public class IndexService {
if (indexFile != null) {
final IndexFile flushThisFile = prevIndexFile;
- Thread flushThread = new Thread(new
AbstractBrokerRunnable(defaultMessageStore.getBrokerConfig()) {
+ Thread flushThread = new Thread(new Runnable() {
@Override
- public void run0() {
+ public void run() {
IndexService.this.flush(flushThisFile);
}
}, "FlushIndexFileThread");