This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 895489ddb6 [ISSUE #9282] Optimize BrokerController#printWaterMark
(#9283)
895489ddb6 is described below
commit 895489ddb6dcca74229bdf3f44af937e13f1597f
Author: yx9o <[email protected]>
AuthorDate: Mon Mar 31 10:49:24 2025 +0800
[ISSUE #9282] Optimize BrokerController#printWaterMark (#9283)
---
.../apache/rocketmq/broker/BrokerController.java | 39 +++++++++++++++++-----
1 file changed, 30 insertions(+), 9 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index a715ec3a4e..c6163499a9 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.acl.plain.PlainAccessValidator;
@@ -1284,16 +1285,36 @@ public class BrokerController {
return this.headSlowTimeMills(this.ackThreadPoolQueue);
}
+ public long headSlowTimeMills4EndTransactionThreadPoolQueue() {
+ return this.headSlowTimeMills(this.endTransactionThreadPoolQueue);
+ }
+
+ public long headSlowTimeMills4ClientManagerThreadPoolQueue() {
+ return this.headSlowTimeMills(this.clientManagerThreadPoolQueue);
+ }
+
+ public long headSlowTimeMills4HeartbeatThreadPoolQueue() {
+ return this.headSlowTimeMills(this.heartbeatThreadPoolQueue);
+ }
+
+ public long headSlowTimeMills4AdminBrokerThreadPoolQueue() {
+ return this.headSlowTimeMills(this.adminBrokerThreadPoolQueue);
+ }
+
public void printWaterMark() {
- LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills:
{}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue());
- LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills:
{}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue());
- LOG_WATER_MARK.info("[WATERMARK] Query Queue Size: {} SlowTimeMills:
{}", this.queryThreadPoolQueue.size(),
headSlowTimeMills4QueryThreadPoolQueue());
- LOG_WATER_MARK.info("[WATERMARK] Lite Pull Queue Size: {}
SlowTimeMills: {}", this.litePullThreadPoolQueue.size(),
headSlowTimeMills4LitePullThreadPoolQueue());
- LOG_WATER_MARK.info("[WATERMARK] Transaction Queue Size: {}
SlowTimeMills: {}", this.endTransactionThreadPoolQueue.size(),
headSlowTimeMills(this.endTransactionThreadPoolQueue));
- LOG_WATER_MARK.info("[WATERMARK] ClientManager Queue Size: {}
SlowTimeMills: {}", this.clientManagerThreadPoolQueue.size(),
this.headSlowTimeMills(this.clientManagerThreadPoolQueue));
- LOG_WATER_MARK.info("[WATERMARK] Heartbeat Queue Size: {}
SlowTimeMills: {}", this.heartbeatThreadPoolQueue.size(),
this.headSlowTimeMills(this.heartbeatThreadPoolQueue));
- LOG_WATER_MARK.info("[WATERMARK] Ack Queue Size: {} SlowTimeMills:
{}", this.ackThreadPoolQueue.size(),
headSlowTimeMills(this.ackThreadPoolQueue));
- LOG_WATER_MARK.info("[WATERMARK] Admin Queue Size: {} SlowTimeMills:
{}", this.adminBrokerThreadPoolQueue.size(),
headSlowTimeMills(this.adminBrokerThreadPoolQueue));
+ logWaterMarkQueueInfo("Send", this.sendThreadPoolQueue,
this::headSlowTimeMills4SendThreadPoolQueue);
+ logWaterMarkQueueInfo("Pull", this.pullThreadPoolQueue,
this::headSlowTimeMills4PullThreadPoolQueue);
+ logWaterMarkQueueInfo("Query", this.queryThreadPoolQueue,
this::headSlowTimeMills4QueryThreadPoolQueue);
+ logWaterMarkQueueInfo("Lite Pull", this.litePullThreadPoolQueue,
this::headSlowTimeMills4LitePullThreadPoolQueue);
+ logWaterMarkQueueInfo("Transaction",
this.endTransactionThreadPoolQueue,
this::headSlowTimeMills4EndTransactionThreadPoolQueue);
+ logWaterMarkQueueInfo("ClientManager",
this.clientManagerThreadPoolQueue,
this::headSlowTimeMills4ClientManagerThreadPoolQueue);
+ logWaterMarkQueueInfo("Heartbeat", this.heartbeatThreadPoolQueue,
this::headSlowTimeMills4HeartbeatThreadPoolQueue);
+ logWaterMarkQueueInfo("Ack", this.ackThreadPoolQueue,
this::headSlowTimeMills4AckThreadPoolQueue);
+ logWaterMarkQueueInfo("Admin", this.adminBrokerThreadPoolQueue,
this::headSlowTimeMills4AdminBrokerThreadPoolQueue);
+ }
+
+ private void logWaterMarkQueueInfo(String queueName, BlockingQueue<?>
queue, Supplier<Long> slowTimeSupplier) {
+ LOG_WATER_MARK.info("[WATERMARK] {} Queue Size: {} SlowTimeMills: {}",
queueName, queue.size(), slowTimeSupplier.get());
}
public MessageStore getMessageStore() {