codelipenghui commented on code in PR #17456:
URL: https://github.com/apache/pulsar/pull/17456#discussion_r972056579
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedderTest.java:
##########
@@ -224,4 +225,114 @@ public void testPrintResourceUsage() {
assertEquals(data.printResourceUsage(),
"cpu: 10.00%, memory: 50.00%, directMemory: 90.00%, bandwidthIn:
30.00%, bandwidthOut: 20.00%");
}
+
+ @Test
+ public void testRangeThroughput() {
+ int numBundles = 10;
+ int brokerNum = 11;
+ int lowLoadNode = 10;
+ LoadData loadData = new LoadData();
+ double throughput = 100 * 1024 * 1024;
+ //There are 11 Brokers, of which 10 are loaded at 80% and 1 is loaded
at 0%.
+ //At this time, the average load is 80*10/11 = 72.73, and the
threshold for rebalancing is 72.73 + 10 = 82.73.
+ //Since 80 < 82.73, rebalancing will not be trigger, and there is one
Broker with load of 0.
+ for (int i = 0; i < brokerNum; i++) {
+ LocalBrokerData broker = new LocalBrokerData();
+ for (int j = 0; j < numBundles; j++) {
+ broker.getBundles().add("bundle-" + j);
+ BundleData bundle = new BundleData();
+ TimeAverageMessageData timeAverageMessageData = new
TimeAverageMessageData();
+ timeAverageMessageData.setMsgThroughputIn(i == lowLoadNode ? 0
: throughput);
+ timeAverageMessageData.setMsgThroughputOut(i == lowLoadNode ?
0 : throughput);
+ bundle.setShortTermData(timeAverageMessageData);
+ String broker2BundleName = "broker-" + i + "-bundle-" + j;
+ loadData.getBundleData().put(broker2BundleName, bundle);
+ broker.getBundles().add(broker2BundleName);
+ }
+ broker.setBandwidthIn(new ResourceUsage(i == lowLoadNode ? 0 : 80,
100));
+ broker.setBandwidthOut(new ResourceUsage(i == lowLoadNode ? 0 :
80, 100));
+ broker.setMsgThroughputIn(i == lowLoadNode ? 0 : throughput);
+ broker.setMsgThroughputOut(i == lowLoadNode ? 0 : throughput);
+ loadData.getBrokerData().put("broker-" + i, new
BrokerData(broker));
+ }
+ ThresholdShedder shedder = new ThresholdShedder();
+ Multimap<String, String> bundlesToUnload =
shedder.findBundlesForUnloading(loadData, conf);
+ assertTrue(bundlesToUnload.isEmpty());
+ conf.setLowerBoundarySheddingEnabled(true);
+ bundlesToUnload = thresholdShedder.findBundlesForUnloading(loadData,
conf);
+ assertFalse(bundlesToUnload.isEmpty());
+ }
+
+ @Test
+ public void testNoBrokerToOffload() {
+ int numBundles = 10;
+ int brokerNum = 11;
+ LoadData loadData = new LoadData();
+ double throughput = 80 * 1024 * 1024;
+ //Load of all Brokers are 80%, and no Broker needs to offload.
+ for (int i = 0; i < brokerNum; i++) {
+ LocalBrokerData broker = new LocalBrokerData();
+ for (int j = 0; j < numBundles; j++) {
+ broker.getBundles().add("bundle-" + j);
+ BundleData bundle = new BundleData();
+ TimeAverageMessageData timeAverageMessageData = new
TimeAverageMessageData();
+ timeAverageMessageData.setMsgThroughputIn(throughput);
+ timeAverageMessageData.setMsgThroughputOut(throughput);
+ bundle.setShortTermData(timeAverageMessageData);
+ String broker2BundleName = "broker-" + i + "-bundle-" + j;
+ loadData.getBundleData().put(broker2BundleName, bundle);
+ broker.getBundles().add(broker2BundleName);
+ }
+ broker.setBandwidthIn(new ResourceUsage(80, 100));
+ broker.setBandwidthOut(new ResourceUsage(80, 100));
+ broker.setMsgThroughputIn(throughput);
+ broker.setMsgThroughputOut(throughput);
+ loadData.getBrokerData().put("broker-" + i, new
BrokerData(broker));
+ }
+ ThresholdShedder shedder = new ThresholdShedder();
+ Multimap<String, String> bundlesToUnload =
shedder.findBundlesForUnloading(loadData, conf);
+ assertTrue(bundlesToUnload.isEmpty());
+ conf.setLowerBoundarySheddingEnabled(true);
+ bundlesToUnload = thresholdShedder.findBundlesForUnloading(loadData,
conf);
+ assertTrue(bundlesToUnload.isEmpty());
+ }
+
+ @Test
+ public void testBrokerWithOneBundle() {
Review Comment:
```suggestion
public void testLowerBoundarySheddingBrokerWithOneBundle() {
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedderTest.java:
##########
@@ -224,4 +225,114 @@ public void testPrintResourceUsage() {
assertEquals(data.printResourceUsage(),
"cpu: 10.00%, memory: 50.00%, directMemory: 90.00%, bandwidthIn:
30.00%, bandwidthOut: 20.00%");
}
+
+ @Test
+ public void testRangeThroughput() {
+ int numBundles = 10;
+ int brokerNum = 11;
+ int lowLoadNode = 10;
+ LoadData loadData = new LoadData();
+ double throughput = 100 * 1024 * 1024;
+ //There are 11 Brokers, of which 10 are loaded at 80% and 1 is loaded
at 0%.
+ //At this time, the average load is 80*10/11 = 72.73, and the
threshold for rebalancing is 72.73 + 10 = 82.73.
+ //Since 80 < 82.73, rebalancing will not be trigger, and there is one
Broker with load of 0.
+ for (int i = 0; i < brokerNum; i++) {
+ LocalBrokerData broker = new LocalBrokerData();
+ for (int j = 0; j < numBundles; j++) {
+ broker.getBundles().add("bundle-" + j);
+ BundleData bundle = new BundleData();
+ TimeAverageMessageData timeAverageMessageData = new
TimeAverageMessageData();
+ timeAverageMessageData.setMsgThroughputIn(i == lowLoadNode ? 0
: throughput);
+ timeAverageMessageData.setMsgThroughputOut(i == lowLoadNode ?
0 : throughput);
+ bundle.setShortTermData(timeAverageMessageData);
+ String broker2BundleName = "broker-" + i + "-bundle-" + j;
+ loadData.getBundleData().put(broker2BundleName, bundle);
+ broker.getBundles().add(broker2BundleName);
+ }
+ broker.setBandwidthIn(new ResourceUsage(i == lowLoadNode ? 0 : 80,
100));
+ broker.setBandwidthOut(new ResourceUsage(i == lowLoadNode ? 0 :
80, 100));
+ broker.setMsgThroughputIn(i == lowLoadNode ? 0 : throughput);
+ broker.setMsgThroughputOut(i == lowLoadNode ? 0 : throughput);
+ loadData.getBrokerData().put("broker-" + i, new
BrokerData(broker));
+ }
+ ThresholdShedder shedder = new ThresholdShedder();
+ Multimap<String, String> bundlesToUnload =
shedder.findBundlesForUnloading(loadData, conf);
+ assertTrue(bundlesToUnload.isEmpty());
+ conf.setLowerBoundarySheddingEnabled(true);
+ bundlesToUnload = thresholdShedder.findBundlesForUnloading(loadData,
conf);
+ assertFalse(bundlesToUnload.isEmpty());
+ }
+
+ @Test
+ public void testNoBrokerToOffload() {
Review Comment:
```suggestion
public void testLowerBoundarySheddingNoBrokerToOffload() {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]