This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ed07c1eba7 [improve][cli] Use LoadManagerReport instead of Object 
(#22850)
5ed07c1eba7 is described below

commit 5ed07c1eba7d05a095f5a165742ca5275d6c673f
Author: crossoverJie <[email protected]>
AuthorDate: Wed Jun 19 02:23:50 2024 +0800

    [improve][cli] Use LoadManagerReport instead of Object (#22850)
---
 .../data/loadbalancer/LocalBrokerDataTest.java     | 21 ++++++++++---
 .../apache/pulsar/testclient/BrokerMonitor.java    | 36 +++++++++++++---------
 2 files changed, 38 insertions(+), 19 deletions(-)

diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerDataTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerDataTest.java
index db55ecfe503..b5d7e3c355a 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerDataTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerDataTest.java
@@ -20,7 +20,8 @@ package org.apache.pulsar.policies.data.loadbalancer;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.gson.Gson;
+import com.fasterxml.jackson.databind.ObjectReader;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -30,14 +31,26 @@ import static org.testng.Assert.assertFalse;
 public class LocalBrokerDataTest {
 
     @Test
-    public void testLocalBrokerDataDeserialization() {
+    public void testLocalBrokerDataDeserialization() throws 
JsonProcessingException {
+        ObjectReader LOAD_REPORT_READER = 
ObjectMapperFactory.getMapper().reader()
+                .forType(LoadManagerReport.class);
         String data = 
"{\"webServiceUrl\":\"http://10.244.2.23:8080\",\"webServiceUrlTls\":\"https://10.244.2.23:8081\",\"pulsarServiceUrlTls\":\"pulsar+ssl://10.244.2.23:6651\",\"persistentTopicsEnabled\":true,\"nonPersistentTopicsEnabled\":false,\"cpu\":{\"usage\":3.1577712104798255,\"limit\":100.0},\"memory\":{\"usage\":614.0,\"limit\":1228.0},\"directMemory\":{\"usage\":32.0,\"limit\":1228.0},\"bandwidthIn\":{\"usage\":0.0,\"limit\":0.0},\"bandwidthOut\":{\"usage\":0.0,\"limit\":0.0}
 [...]
-        Gson gson = new Gson();
-        LocalBrokerData localBrokerData = gson.fromJson(data, 
LocalBrokerData.class);
+        LoadManagerReport localBrokerData = LOAD_REPORT_READER.readValue(data);
         Assert.assertEquals(localBrokerData.getMemory().limit, 1228.0d, 
0.0001f);
         Assert.assertEquals(localBrokerData.getMemory().usage, 614.0d, 
0.0001f);
         Assert.assertEquals(localBrokerData.getMemory().percentUsage(), 
((float) localBrokerData.getMemory().usage) / ((float) 
localBrokerData.getMemory().limit) * 100, 0.0001f);
     }
+    @Test
+    public void testTimeAverageBrokerDataDataDeserialization() throws 
JsonProcessingException {
+        ObjectReader TIME_AVERAGE_READER = 
ObjectMapperFactory.getMapper().reader()
+                .forType(TimeAverageBrokerData.class);
+        String data = 
"{\"shortTermMsgThroughputIn\":100,\"shortTermMsgThroughputOut\":200,\"shortTermMsgRateIn\":300,\"shortTermMsgRateOut\":400,\"longTermMsgThroughputIn\":567.891,\"longTermMsgThroughputOut\":678.912,\"longTermMsgRateIn\":789.123,\"longTermMsgRateOut\":890.123}";
+        TimeAverageBrokerData timeAverageBrokerData = 
TIME_AVERAGE_READER.readValue(data);
+        assertEquals(timeAverageBrokerData.getShortTermMsgThroughputIn(), 
100.00);
+        assertEquals(timeAverageBrokerData.getShortTermMsgThroughputOut(), 
200.00);
+        assertEquals(timeAverageBrokerData.getShortTermMsgRateIn(), 300.00);
+        assertEquals(timeAverageBrokerData.getShortTermMsgRateOut(), 400.00);
+    }
 
     @Test
     public void testMaxResourceUsage() {
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
index a2f5b382c7b..6af4925a7c6 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.testclient;
 
 import static 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC;
 import static 
org.apache.pulsar.broker.resources.LoadBalanceResources.BROKER_TIME_AVERAGE_BASE_PATH;
-import com.google.gson.Gson;
+import com.fasterxml.jackson.databind.ObjectReader;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -35,6 +35,8 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.api.TableView;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
 import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
 import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
 import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
@@ -63,7 +65,11 @@ public class BrokerMonitor extends CmdBase {
     private static final int ZOOKEEPER_TIMEOUT_MILLIS = 30000;
     private static final int GLOBAL_STATS_PRINT_PERIOD_MILLIS = 60000;
     private ZooKeeper zkClient;
-    private static final Gson gson = new Gson();
+    private static final ObjectReader LOAD_REPORT_READER = 
ObjectMapperFactory.getMapper().reader()
+            .forType(LoadManagerReport.class);
+
+    private static final ObjectReader TIME_AVERAGE_READER = 
ObjectMapperFactory.getMapper().reader()
+            .forType(TimeAverageBrokerData.class);
 
     // Fields common for message rows.
     private static final List<Object> MESSAGE_FIELDS = Arrays.asList("MSG/S 
IN", "MSG/S OUT", "TOTAL", "KB/S IN",
@@ -85,7 +91,7 @@ public class BrokerMonitor extends CmdBase {
     private static final Object[] ALLOC_MESSAGE_ROW = makeMessageRow("ALLOC 
MSG");
     private static final Object[] GLOBAL_HEADER = { "BROKER", "BUNDLE", 
"MSG/S", "LONG/S", "KB/S", "MAX %" };
 
-    private Map<String, Object> loadData;
+    private Map<String, LoadManagerReport> loadData;
 
     private static final FixedColumnLengthTableMaker localTableMaker = new 
FixedColumnLengthTableMaker();
 
@@ -146,9 +152,9 @@ public class BrokerMonitor extends CmdBase {
             double totalLongTermMessageRate = 0;
             double maxMaxUsage = 0;
             int i = 1;
-            for (final Map.Entry<String, Object> entry : loadData.entrySet()) {
+            for (final Map.Entry<String, LoadManagerReport> entry : 
loadData.entrySet()) {
                 final String broker = entry.getKey();
-                final Object data = entry.getValue();
+                final LoadManagerReport data = entry.getValue();
                 rows[i] = new Object[GLOBAL_HEADER.length];
                 rows[i][0] = broker;
                 int numBundles;
@@ -177,9 +183,8 @@ public class BrokerMonitor extends CmdBase {
                     messageRate = localData.getMsgRateIn() + 
localData.getMsgRateOut();
                     final String timeAveragePath = 
BROKER_TIME_AVERAGE_BASE_PATH + "/" + broker;
                     try {
-                        final TimeAverageBrokerData timeAverageData = 
gson.fromJson(
-                                new String(zkClient.getData(timeAveragePath, 
false, null)),
-                                TimeAverageBrokerData.class);
+                        final TimeAverageBrokerData timeAverageData = 
TIME_AVERAGE_READER.readValue(
+                                new String(zkClient.getData(timeAveragePath, 
false, null)));
                         longTermMessageRate = 
timeAverageData.getLongTermMsgRateIn()
                                 + timeAverageData.getLongTermMsgRateOut();
                     } catch (Exception x) {
@@ -307,20 +312,21 @@ public class BrokerMonitor extends CmdBase {
         private synchronized void printData(final String path) {
             final String broker = brokerNameFromPath(path);
             String jsonString;
+            LoadManagerReport loadManagerReport;
             try {
                 jsonString = new String(zkClient.getData(path, this, null));
+                loadManagerReport = LOAD_REPORT_READER.readValue(jsonString);
             } catch (Exception ex) {
                 throw new RuntimeException(ex);
             }
-            // Use presence of the String "allocated" to determine if this is 
using SimpleLoadManagerImpl.
-            if (jsonString.contains("allocated")) {
-                printLoadReport(broker, gson.fromJson(jsonString, 
LoadReport.class));
-            } else {
-                final LocalBrokerData localBrokerData = 
gson.fromJson(jsonString, LocalBrokerData.class);
+            if (loadManagerReport instanceof LoadReport) {
+                printLoadReport(broker, (LoadReport) loadManagerReport);
+            } else  {
+                final LocalBrokerData localBrokerData = (LocalBrokerData) 
loadManagerReport;
                 final String timeAveragePath = BROKER_TIME_AVERAGE_BASE_PATH + 
"/" + broker;
                 try {
-                    final TimeAverageBrokerData timeAverageData = 
gson.fromJson(
-                            new String(zkClient.getData(timeAveragePath, 
false, null)), TimeAverageBrokerData.class);
+                    final TimeAverageBrokerData timeAverageData = 
TIME_AVERAGE_READER.readValue(
+                            new String(zkClient.getData(timeAveragePath, 
false, null)));
                     printBrokerData(broker, localBrokerData, timeAverageData);
                 } catch (Exception e) {
                     throw new RuntimeException(e);

Reply via email to