This is an automated email from the ASF dual-hosted git repository.
ltamber pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 19f799d936 [ISSUE #8340] RuntimeInfo and ClusterListSubCommand show
ackThreadPoolQueueSize and ackThreadPoolQueueHeadWaitTimeMills (#8339)
19f799d936 is described below
commit 19f799d93617c17b093c9e372c5141f3ce32b292
Author: rongtong <[email protected]>
AuthorDate: Wed Feb 26 15:29:55 2025 +0800
[ISSUE #8340] RuntimeInfo and ClusterListSubCommand show
ackThreadPoolQueueSize and ackThreadPoolQueueHeadWaitTimeMills (#8339)
* RuntimeInfo and ClusterListSubCommand show ackThreadPoolQueueSize and
ackThreadPoolQueueHeadWaitTimeMills
* RuntimeInfo and ClusterListSubCommand show ackThreadPoolQueueSize and
ackThreadPoolQueueHeadWaitTimeMills
---
.../org/apache/rocketmq/broker/BrokerController.java | 4 ++++
.../rocketmq/broker/processor/AdminBrokerProcessor.java | 5 +++++
.../tools/command/cluster/ClusterListSubCommand.java | 16 +++++++++-------
3 files changed, 18 insertions(+), 7 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 4031dce8d6..a715ec3a4e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1280,6 +1280,10 @@ public class BrokerController {
return this.headSlowTimeMills(this.queryThreadPoolQueue);
}
+ public long headSlowTimeMills4AckThreadPoolQueue() {
+ return this.headSlowTimeMills(this.ackThreadPoolQueue);
+ }
+
public void printWaterMark() {
LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills:
{}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue());
LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills:
{}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue());
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 2247e90f56..4ff4bed814 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -2801,10 +2801,15 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
runtimeInfo.put("queryThreadPoolQueueCapacity",
String.valueOf(this.brokerController.getBrokerConfig().getQueryThreadPoolQueueCapacity()));
+ runtimeInfo.put("ackThreadPoolQueueSize",
String.valueOf(this.brokerController.getAckThreadPoolQueue().size()));
+ runtimeInfo.put("ackThreadPoolQueueCapacity",
+
String.valueOf(this.brokerController.getBrokerConfig().getAckThreadPoolQueueCapacity()));
+
runtimeInfo.put("sendThreadPoolQueueHeadWaitTimeMills",
String.valueOf(this.brokerController.headSlowTimeMills4SendThreadPoolQueue()));
runtimeInfo.put("pullThreadPoolQueueHeadWaitTimeMills",
String.valueOf(brokerController.headSlowTimeMills4PullThreadPoolQueue()));
runtimeInfo.put("queryThreadPoolQueueHeadWaitTimeMills",
String.valueOf(this.brokerController.headSlowTimeMills4QueryThreadPoolQueue()));
runtimeInfo.put("litePullThreadPoolQueueHeadWaitTimeMills",
String.valueOf(brokerController.headSlowTimeMills4LitePullThreadPoolQueue()));
+ runtimeInfo.put("ackThreadPoolQueueHeadWaitTimeMills",
String.valueOf(brokerController.headSlowTimeMills4AckThreadPoolQueue()));
runtimeInfo.put("EndTransactionQueueSize",
String.valueOf(this.brokerController.getEndTransactionThreadPoolQueue().size()));
runtimeInfo.put("EndTransactionThreadPoolQueueCapacity",
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java
index ede0fa5cf4..8103f4c7f8 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java
@@ -177,9 +177,9 @@ public class ClusterListSubCommand implements SubCommand {
}
private void printClusterBaseInfo(final Set<String> clusterNames,
- final DefaultMQAdminExt
defaultMQAdminExt,
- final ClusterInfo clusterInfo) {
- System.out.printf("%-22s %-22s %-4s %-22s %-16s %16s %16s %-22s
%-11s %-12s %-8s %-10s%n",
+ final DefaultMQAdminExt defaultMQAdminExt,
+ final ClusterInfo clusterInfo) {
+ System.out.printf("%-22s %-22s %-4s %-22s %-16s %16s %30s %-22s
%-11s %-12s %-8s %-10s%n",
"#Cluster Name",
"#Broker Name",
"#BID",
@@ -212,8 +212,10 @@ public class ClusterListSubCommand implements SubCommand {
String version = "";
String sendThreadPoolQueueSize = "";
String pullThreadPoolQueueSize = "";
+ String ackThreadPoolQueueSize = "";
String sendThreadPoolQueueHeadWaitTimeMills = "";
String pullThreadPoolQueueHeadWaitTimeMills = "";
+ String ackThreadPoolQueueHeadWaitTimeMills = "";
String pageCacheLockTimeMills = "";
String earliestMessageTimeStamp = "";
String commitLogDiskRatio = "";
@@ -228,14 +230,14 @@ public class ClusterListSubCommand implements SubCommand {
isBrokerActive =
Boolean.parseBoolean(kvTable.getTable().get("brokerActive"));
String putTps = kvTable.getTable().get("putTps");
String getTransferredTps =
kvTable.getTable().get("getTransferredTps");
- sendThreadPoolQueueSize =
kvTable.getTable().get("sendThreadPoolQueueSize");
- pullThreadPoolQueueSize =
kvTable.getTable().get("pullThreadPoolQueueSize");
sendThreadPoolQueueSize =
kvTable.getTable().get("sendThreadPoolQueueSize");
pullThreadPoolQueueSize =
kvTable.getTable().get("pullThreadPoolQueueSize");
+ ackThreadPoolQueueSize =
kvTable.getTable().getOrDefault("ackThreadPoolQueueSize", "N");
sendThreadPoolQueueHeadWaitTimeMills =
kvTable.getTable().get("sendThreadPoolQueueHeadWaitTimeMills");
pullThreadPoolQueueHeadWaitTimeMills =
kvTable.getTable().get("pullThreadPoolQueueHeadWaitTimeMills");
+ ackThreadPoolQueueHeadWaitTimeMills =
kvTable.getTable().getOrDefault("ackThreadPoolQueueHeadWaitTimeMills", "N");
pageCacheLockTimeMills =
kvTable.getTable().get("pageCacheLockTimeMills");
earliestMessageTimeStamp =
kvTable.getTable().get("earliestMessageTimeStamp");
commitLogDiskRatio =
kvTable.getTable().get("commitLogDiskRatio");
@@ -280,14 +282,14 @@ public class ClusterListSubCommand implements SubCommand {
space = Double.parseDouble(commitLogDiskRatio);
}
- System.out.printf("%-22s %-22s %-4s %-22s %-16s
%16s %16s %-22s %11s %-12s %-8s %10s%n",
+ System.out.printf("%-22s %-22s %-4s %-22s %-16s
%16s %30s %-22s %11s %-12s %-8s %10s%n",
clusterName,
brokerName,
next1.getKey(),
next1.getValue(),
version,
String.format("%9.2f(%s,%sms)", in,
sendThreadPoolQueueSize, sendThreadPoolQueueHeadWaitTimeMills),
- String.format("%9.2f(%s,%sms)", out,
pullThreadPoolQueueSize, pullThreadPoolQueueHeadWaitTimeMills),
+ String.format("%9.2f(%s,%sms|%s,%sms)", out,
pullThreadPoolQueueSize, pullThreadPoolQueueHeadWaitTimeMills,
ackThreadPoolQueueSize, ackThreadPoolQueueHeadWaitTimeMills),
String.format("%d-%d(%.1fw, %.1f, %.1f)",
timerReadBehind, timerOffsetBehind, timerCongestNum / 10000.0f,
timerEnqueueTps, timerDequeueTps),
pageCacheLockTimeMills,
String.format("%2.2f", hour),