This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5ed07c1eba7 [improve][cli] Use LoadManagerReport instead of Object
(#22850)
5ed07c1eba7 is described below
commit 5ed07c1eba7d05a095f5a165742ca5275d6c673f
Author: crossoverJie <[email protected]>
AuthorDate: Wed Jun 19 02:23:50 2024 +0800
[improve][cli] Use LoadManagerReport instead of Object (#22850)
---
.../data/loadbalancer/LocalBrokerDataTest.java | 21 ++++++++++---
.../apache/pulsar/testclient/BrokerMonitor.java | 36 +++++++++++++---------
2 files changed, 38 insertions(+), 19 deletions(-)
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerDataTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerDataTest.java
index db55ecfe503..b5d7e3c355a 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerDataTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerDataTest.java
@@ -20,7 +20,8 @@ package org.apache.pulsar.policies.data.loadbalancer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.gson.Gson;
+import com.fasterxml.jackson.databind.ObjectReader;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -30,14 +31,26 @@ import static org.testng.Assert.assertFalse;
public class LocalBrokerDataTest {
@Test
- public void testLocalBrokerDataDeserialization() {
+ public void testLocalBrokerDataDeserialization() throws
JsonProcessingException {
+ ObjectReader LOAD_REPORT_READER =
ObjectMapperFactory.getMapper().reader()
+ .forType(LoadManagerReport.class);
String data =
"{\"webServiceUrl\":\"http://10.244.2.23:8080\",\"webServiceUrlTls\":\"https://10.244.2.23:8081\",\"pulsarServiceUrlTls\":\"pulsar+ssl://10.244.2.23:6651\",\"persistentTopicsEnabled\":true,\"nonPersistentTopicsEnabled\":false,\"cpu\":{\"usage\":3.1577712104798255,\"limit\":100.0},\"memory\":{\"usage\":614.0,\"limit\":1228.0},\"directMemory\":{\"usage\":32.0,\"limit\":1228.0},\"bandwidthIn\":{\"usage\":0.0,\"limit\":0.0},\"bandwidthOut\":{\"usage\":0.0,\"limit\":0.0}
[...]
- Gson gson = new Gson();
- LocalBrokerData localBrokerData = gson.fromJson(data,
LocalBrokerData.class);
+ LoadManagerReport localBrokerData = LOAD_REPORT_READER.readValue(data);
Assert.assertEquals(localBrokerData.getMemory().limit, 1228.0d,
0.0001f);
Assert.assertEquals(localBrokerData.getMemory().usage, 614.0d,
0.0001f);
Assert.assertEquals(localBrokerData.getMemory().percentUsage(),
((float) localBrokerData.getMemory().usage) / ((float)
localBrokerData.getMemory().limit) * 100, 0.0001f);
}
+ @Test
+ public void testTimeAverageBrokerDataDataDeserialization() throws
JsonProcessingException {
+ ObjectReader TIME_AVERAGE_READER =
ObjectMapperFactory.getMapper().reader()
+ .forType(TimeAverageBrokerData.class);
+ String data =
"{\"shortTermMsgThroughputIn\":100,\"shortTermMsgThroughputOut\":200,\"shortTermMsgRateIn\":300,\"shortTermMsgRateOut\":400,\"longTermMsgThroughputIn\":567.891,\"longTermMsgThroughputOut\":678.912,\"longTermMsgRateIn\":789.123,\"longTermMsgRateOut\":890.123}";
+ TimeAverageBrokerData timeAverageBrokerData =
TIME_AVERAGE_READER.readValue(data);
+ assertEquals(timeAverageBrokerData.getShortTermMsgThroughputIn(),
100.00);
+ assertEquals(timeAverageBrokerData.getShortTermMsgThroughputOut(),
200.00);
+ assertEquals(timeAverageBrokerData.getShortTermMsgRateIn(), 300.00);
+ assertEquals(timeAverageBrokerData.getShortTermMsgRateOut(), 400.00);
+ }
@Test
public void testMaxResourceUsage() {
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
index a2f5b382c7b..6af4925a7c6 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.testclient;
import static
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC;
import static
org.apache.pulsar.broker.resources.LoadBalanceResources.BROKER_TIME_AVERAGE_BASE_PATH;
-import com.google.gson.Gson;
+import com.fasterxml.jackson.databind.ObjectReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -35,6 +35,8 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.api.TableView;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
@@ -63,7 +65,11 @@ public class BrokerMonitor extends CmdBase {
private static final int ZOOKEEPER_TIMEOUT_MILLIS = 30000;
private static final int GLOBAL_STATS_PRINT_PERIOD_MILLIS = 60000;
private ZooKeeper zkClient;
- private static final Gson gson = new Gson();
+ private static final ObjectReader LOAD_REPORT_READER =
ObjectMapperFactory.getMapper().reader()
+ .forType(LoadManagerReport.class);
+
+ private static final ObjectReader TIME_AVERAGE_READER =
ObjectMapperFactory.getMapper().reader()
+ .forType(TimeAverageBrokerData.class);
// Fields common for message rows.
private static final List<Object> MESSAGE_FIELDS = Arrays.asList("MSG/S
IN", "MSG/S OUT", "TOTAL", "KB/S IN",
@@ -85,7 +91,7 @@ public class BrokerMonitor extends CmdBase {
private static final Object[] ALLOC_MESSAGE_ROW = makeMessageRow("ALLOC
MSG");
private static final Object[] GLOBAL_HEADER = { "BROKER", "BUNDLE",
"MSG/S", "LONG/S", "KB/S", "MAX %" };
- private Map<String, Object> loadData;
+ private Map<String, LoadManagerReport> loadData;
private static final FixedColumnLengthTableMaker localTableMaker = new
FixedColumnLengthTableMaker();
@@ -146,9 +152,9 @@ public class BrokerMonitor extends CmdBase {
double totalLongTermMessageRate = 0;
double maxMaxUsage = 0;
int i = 1;
- for (final Map.Entry<String, Object> entry : loadData.entrySet()) {
+ for (final Map.Entry<String, LoadManagerReport> entry :
loadData.entrySet()) {
final String broker = entry.getKey();
- final Object data = entry.getValue();
+ final LoadManagerReport data = entry.getValue();
rows[i] = new Object[GLOBAL_HEADER.length];
rows[i][0] = broker;
int numBundles;
@@ -177,9 +183,8 @@ public class BrokerMonitor extends CmdBase {
messageRate = localData.getMsgRateIn() +
localData.getMsgRateOut();
final String timeAveragePath =
BROKER_TIME_AVERAGE_BASE_PATH + "/" + broker;
try {
- final TimeAverageBrokerData timeAverageData =
gson.fromJson(
- new String(zkClient.getData(timeAveragePath,
false, null)),
- TimeAverageBrokerData.class);
+ final TimeAverageBrokerData timeAverageData =
TIME_AVERAGE_READER.readValue(
+ new String(zkClient.getData(timeAveragePath,
false, null)));
longTermMessageRate =
timeAverageData.getLongTermMsgRateIn()
+ timeAverageData.getLongTermMsgRateOut();
} catch (Exception x) {
@@ -307,20 +312,21 @@ public class BrokerMonitor extends CmdBase {
private synchronized void printData(final String path) {
final String broker = brokerNameFromPath(path);
String jsonString;
+ LoadManagerReport loadManagerReport;
try {
jsonString = new String(zkClient.getData(path, this, null));
+ loadManagerReport = LOAD_REPORT_READER.readValue(jsonString);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
- // Use presence of the String "allocated" to determine if this is
using SimpleLoadManagerImpl.
- if (jsonString.contains("allocated")) {
- printLoadReport(broker, gson.fromJson(jsonString,
LoadReport.class));
- } else {
- final LocalBrokerData localBrokerData =
gson.fromJson(jsonString, LocalBrokerData.class);
+ if (loadManagerReport instanceof LoadReport) {
+ printLoadReport(broker, (LoadReport) loadManagerReport);
+ } else {
+ final LocalBrokerData localBrokerData = (LocalBrokerData)
loadManagerReport;
final String timeAveragePath = BROKER_TIME_AVERAGE_BASE_PATH +
"/" + broker;
try {
- final TimeAverageBrokerData timeAverageData =
gson.fromJson(
- new String(zkClient.getData(timeAveragePath,
false, null)), TimeAverageBrokerData.class);
+ final TimeAverageBrokerData timeAverageData =
TIME_AVERAGE_READER.readValue(
+ new String(zkClient.getData(timeAveragePath,
false, null)));
printBrokerData(broker, localBrokerData, timeAverageData);
} catch (Exception e) {
throw new RuntimeException(e);