This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 89e021e871 [ISSUE #9898] Remove AbstractBrokerRunnable and replace 
with Runnable
89e021e871 is described below

commit 89e021e871b843b52a1aecc2dcd3adad61089ef6
Author: rongtong <[email protected]>
AuthorDate: Fri Dec 5 14:03:54 2025 +0800

    [ISSUE #9898] Remove AbstractBrokerRunnable and replace with Runnable
    
    Change-Id: I94104151c452d09cbe195a3e1126a473b662a337
    
    Co-authored-by: RongtongJin <[email protected]>
---
 .../apache/rocketmq/broker/BrokerController.java   | 13 +++---
 .../client/DefaultConsumerIdsChangeListener.java   |  5 +--
 .../rocketmq/broker/latency/BrokerFastFailure.java |  5 +--
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java | 35 +++++++---------
 .../rocketmq/common/AbstractBrokerRunnable.java    | 48 ----------------------
 .../apache/rocketmq/container/BrokerContainer.java | 13 +++---
 .../rocketmq/container/InnerBrokerController.java  |  9 ++--
 .../apache/rocketmq/store/DefaultMessageStore.java | 17 ++++----
 .../apache/rocketmq/store/index/IndexService.java  |  5 +--
 9 files changed, 45 insertions(+), 105 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index de1cbfc823..76882cc71c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -121,7 +121,6 @@ import 
org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageC
 import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge;
 import 
org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl;
 import org.apache.rocketmq.broker.util.HookUtils;
-import org.apache.rocketmq.common.AbstractBrokerRunnable;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.BrokerIdentity;
 import org.apache.rocketmq.common.MixAll;
@@ -1814,9 +1813,9 @@ public class BrokerController {
             this.registerBrokerAll(true, false, true);
         }
 
-        
scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new 
AbstractBrokerRunnable(this.getBrokerIdentity()) {
+        
scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new 
Runnable() {
             @Override
-            public void run0() {
+            public void run() {
                 try {
                     if (System.currentTimeMillis() < shouldStartTime) {
                         BrokerController.LOG.info("Register to namesrv after 
{}", shouldStartTime);
@@ -1836,9 +1835,9 @@ public class BrokerController {
         if (this.brokerConfig.isEnableSlaveActingMaster()) {
             scheduleSendHeartbeat();
 
-            
scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new
 AbstractBrokerRunnable(this.getBrokerIdentity()) {
+            
scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new
 Runnable() {
                 @Override
-                public void run0() {
+                public void run() {
                     try {
                         BrokerController.this.syncBrokerMemberGroup();
                     } catch (Throwable e) {
@@ -1869,9 +1868,9 @@ public class BrokerController {
     }
 
     protected void scheduleSendHeartbeat() {
-        
scheduledFutures.add(this.brokerHeartbeatExecutorService.scheduleAtFixedRate(new
 AbstractBrokerRunnable(this.getBrokerIdentity()) {
+        
scheduledFutures.add(this.brokerHeartbeatExecutorService.scheduleAtFixedRate(new
 Runnable() {
             @Override
-            public void run0() {
+            public void run() {
                 if (isIsolated) {
                     return;
                 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
index e046176956..2946e03e1a 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
@@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.common.AbstractBrokerRunnable;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -48,9 +47,9 @@ public class DefaultConsumerIdsChangeListener implements 
ConsumerIdsChangeListen
     public DefaultConsumerIdsChangeListener(BrokerController brokerController) 
{
         this.brokerController = brokerController;
 
-        scheduledExecutorService.scheduleAtFixedRate(new 
AbstractBrokerRunnable(brokerController.getBrokerConfig()) {
+        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
             @Override
-            public void run0() {
+            public void run() {
                 try {
                     notifyConsumerChange();
                 } catch (Exception e) {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
index ce8fdd8857..31bdb838a2 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
@@ -23,7 +23,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.common.AbstractBrokerRunnable;
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.UtilAll;
@@ -80,9 +79,9 @@ public class BrokerFastFailure {
     }
 
     public void start() {
-        this.scheduledExecutorService.scheduleAtFixedRate(new 
AbstractBrokerRunnable(this.brokerController.getBrokerConfig()) {
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
             @Override
-            public void run0() {
+            public void run() {
                 if 
(brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {
                     cleanExpiredRequest();
                 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java 
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 21ba349c84..ba4ba2ccf9 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -46,7 +46,6 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.impl.consumer.PullResultExt;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
-import org.apache.rocketmq.common.AbstractBrokerRunnable;
 import org.apache.rocketmq.common.BrokerIdentity;
 import org.apache.rocketmq.common.LockCallback;
 import org.apache.rocketmq.common.MixAll;
@@ -356,18 +355,14 @@ public class BrokerOuterAPI {
             requestHeader.setClusterName(clusterName);
 
             for (final String namesrvAddr : nameServerAddressList) {
-                brokerOuterExecutor.execute(new AbstractBrokerRunnable(new 
BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) {
-
-                    @Override
-                    public void run0() {
-                        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, 
requestHeader);
-                        request.setBody(dataVersion.encode());
+                brokerOuterExecutor.execute(() -> {
+                    RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, 
requestHeader);
+                    request.setBody(dataVersion.encode());
 
-                        try {
-                            
BrokerOuterAPI.this.remotingClient.invokeOneway(namesrvAddr, request, 
timeoutMillis);
-                        } catch (Exception e) {
-                            LOGGER.error("sendHeartbeat Exception " + 
namesrvAddr, e);
-                        }
+                    try {
+                        
BrokerOuterAPI.this.remotingClient.invokeOneway(namesrvAddr, request, 
timeoutMillis);
+                    } catch (Exception e) {
+                        LOGGER.error("sendHeartbeat Exception " + namesrvAddr, 
e);
                     }
                 });
             }
@@ -389,9 +384,9 @@ public class BrokerOuterAPI {
 
         if (nameServerAddressList != null && nameServerAddressList.size() > 0) 
{
             for (final String namesrvAddr : nameServerAddressList) {
-                brokerOuterExecutor.execute(new AbstractBrokerRunnable(new 
BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) {
+                brokerOuterExecutor.execute(new Runnable() {
                     @Override
-                    public void run0() {
+                    public void run() {
                         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.BROKER_HEARTBEAT, 
requestHeader);
 
                         try {
@@ -532,9 +527,9 @@ public class BrokerOuterAPI {
             requestHeader.setBodyCrc32(bodyCrc32);
             final CountDownLatch countDownLatch = new 
CountDownLatch(nameServerAddressList.size());
             for (final String namesrvAddr : nameServerAddressList) {
-                brokerOuterExecutor.execute(new 
AbstractBrokerRunnable(brokerIdentity) {
+                brokerOuterExecutor.execute(new Runnable() {
                     @Override
-                    public void run0() {
+                    public void run() {
                         try {
                             RegisterBrokerResult result = 
registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
                             if (result != null) {
@@ -719,9 +714,9 @@ public class BrokerOuterAPI {
         if (nameServerAddressList != null && nameServerAddressList.size() > 0) 
{
             final CountDownLatch countDownLatch = new 
CountDownLatch(nameServerAddressList.size());
             for (final String namesrvAddr : nameServerAddressList) {
-                brokerOuterExecutor.execute(new AbstractBrokerRunnable(new 
BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) {
+                brokerOuterExecutor.execute(new Runnable() {
                     @Override
-                    public void run0() {
+                    public void run() {
                         try {
                             QueryDataVersionRequestHeader requestHeader = new 
QueryDataVersionRequestHeader();
                             requestHeader.setBrokerAddr(brokerAddr);
@@ -1501,9 +1496,9 @@ public class BrokerOuterAPI {
         
requestHeader.setHeartbeatTimeoutMills(controllerHeartBeatTimeoutMills);
         requestHeader.setElectionPriority(electionPriority);
         requestHeader.setBrokerId(brokerId);
-        brokerOuterExecutor.execute(new AbstractBrokerRunnable(new 
BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) {
+        brokerOuterExecutor.execute(new Runnable() {
             @Override
-            public void run0() {
+            public void run() {
                 RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.BROKER_HEARTBEAT, 
requestHeader);
 
                 try {
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/AbstractBrokerRunnable.java 
b/common/src/main/java/org/apache/rocketmq/common/AbstractBrokerRunnable.java
deleted file mode 100644
index 34aabc5772..0000000000
--- 
a/common/src/main/java/org/apache/rocketmq/common/AbstractBrokerRunnable.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.common;
-
-import java.io.File;
-import org.apache.rocketmq.logging.org.slf4j.MDC;
-
-public abstract class AbstractBrokerRunnable implements Runnable {
-    protected final BrokerIdentity brokerIdentity;
-
-    public AbstractBrokerRunnable(BrokerIdentity brokerIdentity) {
-        this.brokerIdentity = brokerIdentity;
-    }
-
-    private static final String MDC_BROKER_CONTAINER_LOG_DIR = 
"brokerContainerLogDir";
-
-    /**
-     * real logic for running
-     */
-    public abstract void run0();
-
-    @Override
-    public void run() {
-        try {
-            if (brokerIdentity.isInBrokerContainer()) {
-                MDC.put(MDC_BROKER_CONTAINER_LOG_DIR, File.separator + 
brokerIdentity.getCanonicalName());
-            }
-            run0();
-        } finally {
-            MDC.clear();
-        }
-    }
-}
diff --git 
a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java 
b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
index aa38fb6224..e8debfe99b 100644
--- a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
+++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
@@ -22,7 +22,6 @@ import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.BrokerPathConfigHelper;
 import org.apache.rocketmq.broker.ConfigContext;
 import org.apache.rocketmq.broker.out.BrokerOuterAPI;
-import org.apache.rocketmq.common.AbstractBrokerRunnable;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.BrokerIdentity;
 import org.apache.rocketmq.common.MixAll;
@@ -156,9 +155,9 @@ public class BrokerContainer implements IBrokerContainer {
             this.updateNamesrvAddr();
             LOG.info("Set user specified name server address: {}", 
this.brokerContainerConfig.getNamesrvAddr());
             // also auto update namesrv if specify
-            this.scheduledExecutorService.scheduleAtFixedRate(new 
AbstractBrokerRunnable(BrokerIdentity.BROKER_CONTAINER_IDENTITY) {
+            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                 @Override
-                public void run0() {
+                public void run() {
                     try {
                         BrokerContainer.this.updateNamesrvAddr();
                     } catch (Throwable e) {
@@ -167,10 +166,10 @@ public class BrokerContainer implements IBrokerContainer {
                 }
             }, 1000 * 10, 
this.brokerContainerConfig.getUpdateNamesrvAddrInterval(), 
TimeUnit.MILLISECONDS);
         } else if 
(this.brokerContainerConfig.isFetchNamesrvAddrByAddressServer()) {
-            this.scheduledExecutorService.scheduleAtFixedRate(new 
AbstractBrokerRunnable(BrokerIdentity.BROKER_CONTAINER_IDENTITY) {
+            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 
                 @Override
-                public void run0() {
+                public void run() {
                     try {
                         
BrokerContainer.this.brokerOuterAPI.fetchNameServerAddr();
                     } catch (Throwable e) {
@@ -180,9 +179,9 @@ public class BrokerContainer implements IBrokerContainer {
             }, 1000 * 10, 
this.brokerContainerConfig.getFetchNamesrvAddrInterval(), 
TimeUnit.MILLISECONDS);
         }
 
-        this.scheduledExecutorService.scheduleAtFixedRate(new 
AbstractBrokerRunnable(BrokerIdentity.BROKER_CONTAINER_IDENTITY) {
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
             @Override
-            public void run0() {
+            public void run() {
                 try {
                     BrokerContainer.this.brokerOuterAPI.refreshMetadata();
                 } catch (Exception e) {
diff --git 
a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
 
b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
index 41ce28214b..102bd4710f 100644
--- 
a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
+++ 
b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
@@ -21,7 +21,6 @@ import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.auth.config.AuthConfig;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.out.BrokerOuterAPI;
-import org.apache.rocketmq.common.AbstractBrokerRunnable;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.remoting.RemotingServer;
@@ -82,9 +81,9 @@ public class InnerBrokerController extends BrokerController {
             this.registerBrokerAll(true, false, true);
         }
 
-        
scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new 
AbstractBrokerRunnable(this.getBrokerIdentity()) {
+        
scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new 
Runnable() {
             @Override
-            public void run0() {
+            public void run() {
                 try {
                     if (System.currentTimeMillis() < shouldStartTime) {
                         BrokerController.LOG.info("Register to namesrv after 
{}", shouldStartTime);
@@ -104,9 +103,9 @@ public class InnerBrokerController extends BrokerController 
{
         if (this.brokerConfig.isEnableSlaveActingMaster()) {
             scheduleSendHeartbeat();
 
-            
scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new
 AbstractBrokerRunnable(this.getBrokerIdentity()) {
+            
scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new
 Runnable() {
                 @Override
-                public void run0() {
+                public void run() {
                     try {
                         InnerBrokerController.this.syncBrokerMemberGroup();
                     } catch (Throwable e) {
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index cb5c41471a..d440ccfb11 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -57,7 +57,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.common.AbstractBrokerRunnable;
 import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.BrokerIdentity;
@@ -1775,23 +1774,23 @@ public class DefaultMessageStore implements 
MessageStore {
 
     private void addScheduleTask() {
 
-        this.scheduledExecutorService.scheduleAtFixedRate(new 
AbstractBrokerRunnable(this.getBrokerIdentity()) {
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
             @Override
-            public void run0() {
+            public void run() {
                 DefaultMessageStore.this.cleanFilesPeriodically();
             }
         }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), 
TimeUnit.MILLISECONDS);
 
-        this.scheduledExecutorService.scheduleAtFixedRate(new 
AbstractBrokerRunnable(this.getBrokerIdentity()) {
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
             @Override
-            public void run0() {
+            public void run() {
                 DefaultMessageStore.this.checkSelf();
             }
         }, 1, 10, TimeUnit.MINUTES);
 
-        this.scheduledExecutorService.scheduleAtFixedRate(new 
AbstractBrokerRunnable(this.getBrokerIdentity()) {
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
             @Override
-            public void run0() {
+            public void run() {
                 if 
(DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) {
                     try {
                         if 
(DefaultMessageStore.this.commitLog.getBeginTimeInLock() != 0) {
@@ -1810,9 +1809,9 @@ public class DefaultMessageStore implements MessageStore {
             }
         }, 1, 1, TimeUnit.SECONDS);
 
-        this.scheduledExecutorService.scheduleAtFixedRate(new 
AbstractBrokerRunnable(this.getBrokerIdentity()) {
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
             @Override
-            public void run0() {
+            public void run() {
                 DefaultMessageStore.this.storeCheckpoint.flush();
             }
         }, 1, 1, TimeUnit.SECONDS);
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java 
b/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java
index 2d325ee13a..4d358b4ced 100644
--- a/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.rocketmq.common.AbstractBrokerRunnable;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -339,9 +338,9 @@ public class IndexService {
             if (indexFile != null) {
                 final IndexFile flushThisFile = prevIndexFile;
 
-                Thread flushThread = new Thread(new 
AbstractBrokerRunnable(defaultMessageStore.getBrokerConfig()) {
+                Thread flushThread = new Thread(new Runnable() {
                     @Override
-                    public void run0() {
+                    public void run() {
                         IndexService.this.flush(flushThisFile);
                     }
                 }, "FlushIndexFileThread");

Reply via email to