This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 579f22c8449 [improve][broker] PIP-192 Added --extensions option in
BrokerMonitor (#19654)
579f22c8449 is described below
commit 579f22c8449be287ee1209a477aeaad346495289
Author: Heesung Sohn <[email protected]>
AuthorDate: Wed Mar 1 01:02:11 2023 -0800
[improve][broker] PIP-192 Added --extensions option in BrokerMonitor
(#19654)
---
.../apache/pulsar/testclient/BrokerMonitor.java | 94 ++++++++++++++++++++--
1 file changed, 88 insertions(+), 6 deletions(-)
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
index c209c34a3d7..3f896986016 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.testclient;
+import static
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
@@ -31,7 +32,13 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SizeUnit;
+import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
@@ -54,7 +61,7 @@ public class BrokerMonitor {
private static final String BROKER_ROOT = "/loadbalance/brokers";
private static final int ZOOKEEPER_TIMEOUT_MILLIS = 30000;
private static final int GLOBAL_STATS_PRINT_PERIOD_MILLIS = 60000;
- private final ZooKeeper zkClient;
+ private ZooKeeper zkClient;
private static final Gson gson = new Gson();
// Fields common for message rows.
@@ -77,7 +84,7 @@ public class BrokerMonitor {
private static final Object[] ALLOC_MESSAGE_ROW = makeMessageRow("ALLOC
MSG");
private static final Object[] GLOBAL_HEADER = { "BROKER", "BUNDLE",
"MSG/S", "LONG/S", "KB/S", "MAX %" };
- private final Map<String, Object> loadData;
+ private Map<String, Object> loadData;
private static final FixedColumnLengthTableMaker localTableMaker = new
FixedColumnLengthTableMaker();
static {
@@ -434,8 +441,11 @@ public class BrokerMonitor {
@Parameter(names = { "-h", "--help" }, description = "Help message",
help = true)
boolean help;
- @Parameter(names = { "--connect-string" }, description = "Zookeeper
connect string", required = true)
+ @Parameter(names = { "--connect-string" }, description = "Zookeeper or
broker connect string", required = true)
public String connectString = null;
+
+ @Parameter(names = { "--extensions" }, description = "true to monitor
Load Balance Extensions.")
+ boolean extensions = false;
}
/**
@@ -464,6 +474,71 @@ public class BrokerMonitor {
}
}
+ private TableView<BrokerLoadData> brokerLoadDataTableView;
+
+ private BrokerMonitor(String brokerServiceUrl) {
+ try {
+ PulsarClient client = PulsarClient.builder()
+ .memoryLimit(0, SizeUnit.BYTES)
+ .serviceUrl(brokerServiceUrl)
+ .connectionsPerBroker(4)
+ .ioThreads(Runtime.getRuntime().availableProcessors())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
+ this.brokerLoadDataTableView = client
+ .newTableView(Schema.JSON(BrokerLoadData.class))
+ .topic(BROKER_LOAD_DATA_STORE_TOPIC).create();
+ } catch (Throwable e) {
+ log.info("Failed to start BrokerMonitor", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private synchronized void printBrokerLoadData(final String broker, final
BrokerLoadData brokerLoadData) {
+
+ // Initialize the constant rows.
+ final Object[][] rows = new Object[6][];
+ rows[0] = SYSTEM_ROW;
+ rows[2] = COUNT_ROW;
+ rows[4] = LATEST_ROW;
+
+ // First column is a label, so start at the second column at index 1.
+ // System row.
+ rows[1] = new Object[SYSTEM_ROW.length];
+ initRow(rows[1], brokerLoadData.getCpu().percentUsage(),
brokerLoadData.getMemory().percentUsage(),
+ brokerLoadData.getDirectMemory().percentUsage(),
brokerLoadData.getBandwidthIn().percentUsage(),
+ brokerLoadData.getBandwidthOut().percentUsage(),
brokerLoadData.getMaxResourceUsage() * 100);
+
+ // Count row.
+ rows[3] = new Object[COUNT_ROW.length];
+ initRow(rows[3], null, brokerLoadData.getBundleCount(),
+ null, null,
+ null, null);
+
+ // Latest message data row.
+ rows[5] = new Object[LATEST_ROW.length];
+ initMessageRow(rows[5], brokerLoadData.getMsgRateIn(),
brokerLoadData.getMsgRateOut(),
+ brokerLoadData.getMsgThroughputIn(),
brokerLoadData.getMsgThroughputOut());
+
+ final String table = localTableMaker.make(rows);
+ log.info("\nBroker Data for {}:\n{}\n", broker, table);
+ }
+
+ private synchronized void printBrokerLoadDataStore() {
+ brokerLoadDataTableView.forEach(this::printBrokerLoadData);
+ }
+
+ private void startBrokerLoadDataStoreMonitor() {
+ try {
+ while (true) {
+ Thread.sleep(GLOBAL_STATS_PRINT_PERIOD_MILLIS);
+ printBrokerLoadDataStore();
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
/**
* Run a monitor from command line arguments.
*
@@ -481,8 +556,15 @@ public class BrokerMonitor {
jc.usage();
PerfClientUtils.exit(1);
}
- final ZooKeeper zkClient = new ZooKeeper(arguments.connectString,
ZOOKEEPER_TIMEOUT_MILLIS, null);
- final BrokerMonitor monitor = new BrokerMonitor(zkClient);
- monitor.start();
+
+
+ if (arguments.extensions) {
+ final BrokerMonitor monitor = new
BrokerMonitor(arguments.connectString);
+ monitor.startBrokerLoadDataStoreMonitor();
+ } else {
+ final ZooKeeper zkClient = new ZooKeeper(arguments.connectString,
ZOOKEEPER_TIMEOUT_MILLIS, null);
+ final BrokerMonitor monitor = new BrokerMonitor(zkClient);
+ monitor.start();
+ }
}
}