This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 9173def404 [ISSUE #8348] Allow custom fast-failure queues to be added
in BrokerFastFailure (#8347)
9173def404 is described below
commit 9173def4040f00ea5a4f1e913382b8ccd2b08d17
Author: rongtong <[email protected]>
AuthorDate: Thu Jul 4 16:39:53 2024 +0800
[ISSUE #8348] Allow custom fast-failure queues to be added in
BrokerFastFailure (#8347)
---
.../apache/rocketmq/broker/BrokerController.java | 2 +
.../rocketmq/broker/latency/BrokerFastFailure.java | 45 +++++++-------
.../broker/latency/BrokerFastFailureTest.java | 68 +++++++++++++++++++++-
3 files changed, 94 insertions(+), 21 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 76224db5cb..145a952230 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -2519,4 +2519,6 @@ public class BrokerController {
public void setColdDataCgCtrService(ColdDataCgCtrService
coldDataCgCtrService) {
this.coldDataCgCtrService = coldDataCgCtrService;
}
+
+
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
index 0135ac929a..ce8fdd8857 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
@@ -16,11 +16,15 @@
*/
package org.apache.rocketmq.broker.latency;
+import java.util.List;
+import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
+import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -42,13 +46,26 @@ public class BrokerFastFailure {
private volatile long jstackTime = System.currentTimeMillis();
+ private final List<Pair<BlockingQueue<Runnable>, Supplier<Long>>>
cleanExpiredRequestQueueList = new ArrayList<>();
+
public BrokerFastFailure(final BrokerController brokerController) {
this.brokerController = brokerController;
+ initCleanExpiredRequestQueueList();
this.scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1,
new ThreadFactoryImpl("BrokerFastFailureScheduledThread", true,
brokerController == null ? null :
brokerController.getBrokerConfig()));
}
+ private void initCleanExpiredRequestQueueList() {
+ cleanExpiredRequestQueueList.add(new
Pair<>(this.brokerController.getSendThreadPoolQueue(), () ->
this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue()));
+ cleanExpiredRequestQueueList.add(new
Pair<>(this.brokerController.getPullThreadPoolQueue(), () ->
this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue()));
+ cleanExpiredRequestQueueList.add(new
Pair<>(this.brokerController.getLitePullThreadPoolQueue(), () ->
this.brokerController.getBrokerConfig().getWaitTimeMillsInLitePullQueue()));
+ cleanExpiredRequestQueueList.add(new
Pair<>(this.brokerController.getHeartbeatThreadPoolQueue(), () ->
this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue()));
+ cleanExpiredRequestQueueList.add(new
Pair<>(this.brokerController.getEndTransactionThreadPoolQueue(), () ->
this.brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue()));
+ cleanExpiredRequestQueueList.add(new
Pair<>(this.brokerController.getAckThreadPoolQueue(), () ->
this.brokerController.getBrokerConfig().getWaitTimeMillsInAckQueue()));
+ cleanExpiredRequestQueueList.add(new
Pair<>(this.brokerController.getAdminBrokerThreadPoolQueue(), () ->
this.brokerController.getBrokerConfig().getWaitTimeMillsInAdminBrokerQueue()));
+ }
+
public static RequestTask castRunnable(final Runnable runnable) {
try {
if (runnable instanceof FutureTaskExt) {
@@ -98,26 +115,9 @@ public class BrokerFastFailure {
}
}
-
cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(),
-
this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue());
-
-
cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),
-
this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());
-
-
cleanExpiredRequestInQueue(this.brokerController.getLitePullThreadPoolQueue(),
-
this.brokerController.getBrokerConfig().getWaitTimeMillsInLitePullQueue());
-
-
cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
-
this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());
-
-
cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(),
this
-
.brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue());
-
-
cleanExpiredRequestInQueue(this.brokerController.getAckThreadPoolQueue(),
- brokerController.getBrokerConfig().getWaitTimeMillsInAckQueue());
-
-
cleanExpiredRequestInQueue(this.brokerController.getAdminBrokerThreadPoolQueue(),
-
brokerController.getBrokerConfig().getWaitTimeMillsInAdminBrokerQueue());
+ for (Pair<BlockingQueue<Runnable>, Supplier<Long>> pair :
cleanExpiredRequestQueueList) {
+ cleanExpiredRequestInQueue(pair.getObject1(),
pair.getObject2().get());
+ }
}
void cleanExpiredRequestInQueue(final BlockingQueue<Runnable>
blockingQueue, final long maxWaitTimeMillsInQueue) {
@@ -154,6 +154,11 @@ public class BrokerFastFailure {
}
}
+ public synchronized void
addCleanExpiredRequestQueue(BlockingQueue<Runnable> cleanExpiredRequestQueue,
+ Supplier<Long> maxWaitTimeMillsInQueueSupplier) {
+ cleanExpiredRequestQueueList.add(new Pair<>(cleanExpiredRequestQueue,
maxWaitTimeMillsInQueueSupplier));
+ }
+
public void shutdown() {
this.scheduledExecutorService.shutdown();
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java
index 31b547cf1b..2216a1d50c 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java
@@ -19,16 +19,46 @@ package org.apache.rocketmq.broker.latency;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.future.FutureTaskExt;
import org.apache.rocketmq.remoting.netty.RequestTask;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.MessageStore;
+import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import static org.assertj.core.api.Assertions.assertThat;
public class BrokerFastFailureTest {
+
+ private BrokerController brokerController;
+
+ private final BrokerConfig brokerConfig = new BrokerConfig();
+
+ private MessageStore messageStore;
+
+ @Before
+ public void setUp() {
+ brokerController = Mockito.mock(BrokerController.class);
+ messageStore = Mockito.mock(DefaultMessageStore.class);
+ BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
+
Mockito.when(brokerController.getSendThreadPoolQueue()).thenReturn(queue);
+
Mockito.when(brokerController.getPullThreadPoolQueue()).thenReturn(queue);
+
Mockito.when(brokerController.getLitePullThreadPoolQueue()).thenReturn(queue);
+
Mockito.when(brokerController.getHeartbeatThreadPoolQueue()).thenReturn(queue);
+
Mockito.when(brokerController.getEndTransactionThreadPoolQueue()).thenReturn(queue);
+
Mockito.when(brokerController.getAdminBrokerThreadPoolQueue()).thenReturn(queue);
+
Mockito.when(brokerController.getAckThreadPoolQueue()).thenReturn(queue);
+
Mockito.when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
+ Mockito.when(messageStore.isOSPageCacheBusy()).thenReturn(false);
+
Mockito.when(brokerController.getMessageStore()).thenReturn(messageStore);
+ }
+
@Test
public void testCleanExpiredRequestInQueue() throws Exception {
- BrokerFastFailure brokerFastFailure = new BrokerFastFailure(null);
+ BrokerFastFailure brokerFastFailure = new
BrokerFastFailure(brokerController);
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
brokerFastFailure.cleanExpiredRequestInQueue(queue, 1);
@@ -63,4 +93,40 @@ public class BrokerFastFailureTest {
assertThat(((FutureTaskExt)
queue.peek()).getRunnable()).isEqualTo(requestTask);
}
+ @Test
+ public void testCleanExpiredCustomRequestInQueue() throws Exception {
+ BrokerFastFailure brokerFastFailure = new
BrokerFastFailure(brokerController);
+ brokerFastFailure.start();
+ brokerConfig.setWaitTimeMillsInAckQueue(10);
+ BlockingQueue<Runnable> customThreadPoolQueue = new
LinkedBlockingQueue<>();
+ brokerFastFailure.addCleanExpiredRequestQueue(customThreadPoolQueue,
() -> brokerConfig.getWaitTimeMillsInAckQueue());
+
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+
+ }
+ };
+ RequestTask requestTask = new RequestTask(runnable, null, null);
+ customThreadPoolQueue.add(new FutureTaskExt<>(requestTask, null));
+
+ Thread.sleep(2000);
+
+ assertThat(customThreadPoolQueue.size()).isEqualTo(0);
+ assertThat(requestTask.isStopRun()).isEqualTo(true);
+
+ brokerConfig.setWaitTimeMillsInAckQueue(10000);
+
+ RequestTask requestTask2 = new RequestTask(runnable, null, null);
+ customThreadPoolQueue.add(new FutureTaskExt<>(requestTask2, null));
+
+ Thread.sleep(1000);
+
+ assertThat(customThreadPoolQueue.size()).isEqualTo(1);
+ assertThat(((FutureTaskExt)
customThreadPoolQueue.peek()).getRunnable()).isEqualTo(requestTask2);
+
+ brokerFastFailure.shutdown();
+
+ }
+
}
\ No newline at end of file