This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 579f22c8449 [improve][broker] PIP-192 Added --extensions option in 
BrokerMonitor (#19654)
579f22c8449 is described below

commit 579f22c8449be287ee1209a477aeaad346495289
Author: Heesung Sohn <[email protected]>
AuthorDate: Wed Mar 1 01:02:11 2023 -0800

    [improve][broker] PIP-192 Added --extensions option in BrokerMonitor 
(#19654)
---
 .../apache/pulsar/testclient/BrokerMonitor.java    | 94 ++++++++++++++++++++--
 1 file changed, 88 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
index c209c34a3d7..3f896986016 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.testclient;
 
+import static 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC;
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
@@ -31,7 +32,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SizeUnit;
+import org.apache.pulsar.client.api.TableView;
 import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
 import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
 import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
@@ -54,7 +61,7 @@ public class BrokerMonitor {
     private static final String BROKER_ROOT = "/loadbalance/brokers";
     private static final int ZOOKEEPER_TIMEOUT_MILLIS = 30000;
     private static final int GLOBAL_STATS_PRINT_PERIOD_MILLIS = 60000;
-    private final ZooKeeper zkClient;
+    private ZooKeeper zkClient;
     private static final Gson gson = new Gson();
 
     // Fields common for message rows.
@@ -77,7 +84,7 @@ public class BrokerMonitor {
     private static final Object[] ALLOC_MESSAGE_ROW = makeMessageRow("ALLOC 
MSG");
     private static final Object[] GLOBAL_HEADER = { "BROKER", "BUNDLE", 
"MSG/S", "LONG/S", "KB/S", "MAX %" };
 
-    private final Map<String, Object> loadData;
+    private Map<String, Object> loadData;
 
     private static final FixedColumnLengthTableMaker localTableMaker = new 
FixedColumnLengthTableMaker();
     static {
@@ -434,8 +441,11 @@ public class BrokerMonitor {
         @Parameter(names = { "-h", "--help" }, description = "Help message", 
help = true)
         boolean help;
 
-        @Parameter(names = { "--connect-string" }, description = "Zookeeper 
connect string", required = true)
+        @Parameter(names = { "--connect-string" }, description = "Zookeeper or 
broker connect string", required = true)
         public String connectString = null;
+
+        @Parameter(names = { "--extensions" }, description = "true to monitor 
Load Balance Extensions.")
+        boolean extensions = false;
     }
 
     /**
@@ -464,6 +474,71 @@ public class BrokerMonitor {
         }
     }
 
+    private TableView<BrokerLoadData> brokerLoadDataTableView;
+
+    private BrokerMonitor(String brokerServiceUrl) {
+        try {
+            PulsarClient client = PulsarClient.builder()
+                    .memoryLimit(0, SizeUnit.BYTES)
+                    .serviceUrl(brokerServiceUrl)
+                    .connectionsPerBroker(4)
+                    .ioThreads(Runtime.getRuntime().availableProcessors())
+                    .statsInterval(0, TimeUnit.SECONDS)
+                    .build();
+            this.brokerLoadDataTableView = client
+                    .newTableView(Schema.JSON(BrokerLoadData.class))
+                    .topic(BROKER_LOAD_DATA_STORE_TOPIC).create();
+        } catch (Throwable e) {
+            log.info("Failed to start BrokerMonitor", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private synchronized void printBrokerLoadData(final String broker, final 
BrokerLoadData brokerLoadData) {
+
+        // Initialize the constant rows.
+        final Object[][] rows = new Object[6][];
+        rows[0] = SYSTEM_ROW;
+        rows[2] = COUNT_ROW;
+        rows[4] = LATEST_ROW;
+
+        // First column is a label, so start at the second column at index 1.
+        // System row.
+        rows[1] = new Object[SYSTEM_ROW.length];
+        initRow(rows[1], brokerLoadData.getCpu().percentUsage(), 
brokerLoadData.getMemory().percentUsage(),
+                brokerLoadData.getDirectMemory().percentUsage(), 
brokerLoadData.getBandwidthIn().percentUsage(),
+                brokerLoadData.getBandwidthOut().percentUsage(), 
brokerLoadData.getMaxResourceUsage() * 100);
+
+        // Count row.
+        rows[3] = new Object[COUNT_ROW.length];
+        initRow(rows[3], null, brokerLoadData.getBundleCount(),
+                null, null,
+                null, null);
+
+        // Latest message data row.
+        rows[5] = new Object[LATEST_ROW.length];
+        initMessageRow(rows[5], brokerLoadData.getMsgRateIn(), 
brokerLoadData.getMsgRateOut(),
+                brokerLoadData.getMsgThroughputIn(), 
brokerLoadData.getMsgThroughputOut());
+
+        final String table = localTableMaker.make(rows);
+        log.info("\nBroker Data for {}:\n{}\n", broker, table);
+    }
+
+    private synchronized void printBrokerLoadDataStore() {
+        brokerLoadDataTableView.forEach(this::printBrokerLoadData);
+    }
+
+    private void startBrokerLoadDataStoreMonitor() {
+        try {
+            while (true) {
+                Thread.sleep(GLOBAL_STATS_PRINT_PERIOD_MILLIS);
+                printBrokerLoadDataStore();
+            }
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
     /**
      * Run a monitor from command line arguments.
      *
@@ -481,8 +556,15 @@ public class BrokerMonitor {
             jc.usage();
             PerfClientUtils.exit(1);
         }
-        final ZooKeeper zkClient = new ZooKeeper(arguments.connectString, 
ZOOKEEPER_TIMEOUT_MILLIS, null);
-        final BrokerMonitor monitor = new BrokerMonitor(zkClient);
-        monitor.start();
+
+
+        if (arguments.extensions) {
+            final BrokerMonitor monitor = new 
BrokerMonitor(arguments.connectString);
+            monitor.startBrokerLoadDataStoreMonitor();
+        } else {
+            final ZooKeeper zkClient = new ZooKeeper(arguments.connectString, 
ZOOKEEPER_TIMEOUT_MILLIS, null);
+            final BrokerMonitor monitor = new BrokerMonitor(zkClient);
+            monitor.start();
+        }
     }
 }

Reply via email to