This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 6da3c151feb [fix][broker] usedLocallySinceLastReport should always be
reset (#22672)
6da3c151feb is described below
commit 6da3c151febec9e0ab377207c8e0bbc667acb9c3
Author: Zixuan Liu <[email protected]>
AuthorDate: Thu May 9 09:42:17 2024 +0800
[fix][broker] usedLocallySinceLastReport should always be reset (#22672)
Signed-off-by: Zixuan Liu <[email protected]>
(cherry picked from commit 8f015d89e5d246325ae5cada02c4af3017a97ed9)
---
.../pulsar/broker/resourcegroup/ResourceGroup.java | 3 +-
.../ResourceGroupReportLocalUsageTest.java | 50 ++++++++++++++--------
2 files changed, 34 insertions(+), 19 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
index 27ce07f26c4..1b6f939c1ff 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
@@ -454,14 +454,13 @@ public class ResourceGroup {
bytesUsed = monEntity.usedLocallySinceLastReport.bytes;
messagesUsed = monEntity.usedLocallySinceLastReport.messages;
-
+ monEntity.usedLocallySinceLastReport.bytes =
monEntity.usedLocallySinceLastReport.messages = 0;
if (sendReport) {
p.setBytesPerPeriod(bytesUsed);
p.setMessagesPerPeriod(messagesUsed);
monEntity.lastReportedValues.bytes = bytesUsed;
monEntity.lastReportedValues.messages = messagesUsed;
monEntity.numSuppressedUsageReports = 0;
- monEntity.usedLocallySinceLastReport.bytes =
monEntity.usedLocallySinceLastReport.messages = 0;
monEntity.totalUsedLocally.bytes += bytesUsed;
monEntity.totalUsedLocally.messages += messagesUsed;
monEntity.lastResourceUsageFillTimeMSecsSinceEpoch =
System.currentTimeMillis();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupReportLocalUsageTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupReportLocalUsageTest.java
index 476e08003bb..7ba0ba86d12 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupReportLocalUsageTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupReportLocalUsageTest.java
@@ -72,34 +72,50 @@ public class ResourceGroupReportLocalUsageTest extends
MockedPulsarServiceBaseTe
rgConfig.setPublishRateInMsgs(2000);
service.resourceGroupCreate(rgName, rgConfig);
- org.apache.pulsar.broker.resourcegroup.ResourceGroup resourceGroup =
service.resourceGroupGet(rgName);
BytesAndMessagesCount bytesAndMessagesCount = new
BytesAndMessagesCount();
bytesAndMessagesCount.bytes = 20;
bytesAndMessagesCount.messages = 10;
-
resourceGroup.incrementLocalUsageStats(ResourceGroupMonitoringClass.Publish,
bytesAndMessagesCount);
+
+ org.apache.pulsar.broker.resourcegroup.ResourceGroup resourceGroup =
service.resourceGroupGet(rgName);
+ for (ResourceGroupMonitoringClass value :
ResourceGroupMonitoringClass.values()) {
+ resourceGroup.incrementLocalUsageStats(value,
bytesAndMessagesCount);
+ }
+
+ // Case1: Suppress report ResourceUsage.
+ needReport.set(false);
ResourceUsage resourceUsage = new ResourceUsage();
resourceGroup.rgFillResourceUsage(resourceUsage);
assertFalse(resourceUsage.hasDispatch());
assertFalse(resourceUsage.hasPublish());
+ for (ResourceGroupMonitoringClass value :
ResourceGroupMonitoringClass.values()) {
+ PerMonitoringClassFields monitoredEntity =
+ resourceGroup.getMonitoredEntity(value);
+ assertEquals(monitoredEntity.usedLocallySinceLastReport.messages,
0);
+ assertEquals(monitoredEntity.usedLocallySinceLastReport.bytes, 0);
+ assertEquals(monitoredEntity.totalUsedLocally.messages, 0);
+ assertEquals(monitoredEntity.totalUsedLocally.bytes, 0);
+ assertEquals(monitoredEntity.lastReportedValues.messages, 0);
+ assertEquals(monitoredEntity.lastReportedValues.bytes, 0);
+ }
- PerMonitoringClassFields publishMonitoredEntity =
-
resourceGroup.getMonitoredEntity(ResourceGroupMonitoringClass.Publish);
-
assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.messages,
bytesAndMessagesCount.messages);
- assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.bytes,
bytesAndMessagesCount.bytes);
- assertEquals(publishMonitoredEntity.totalUsedLocally.messages, 0);
- assertEquals(publishMonitoredEntity.totalUsedLocally.bytes, 0);
- assertEquals(publishMonitoredEntity.lastReportedValues.messages, 0);
- assertEquals(publishMonitoredEntity.lastReportedValues.bytes, 0);
-
+ // Case2: Report ResourceUsage.
+ for (ResourceGroupMonitoringClass value :
ResourceGroupMonitoringClass.values()) {
+ resourceGroup.incrementLocalUsageStats(value,
bytesAndMessagesCount);
+ }
needReport.set(true);
+ resourceUsage = new ResourceUsage();
resourceGroup.rgFillResourceUsage(resourceUsage);
assertTrue(resourceUsage.hasDispatch());
assertTrue(resourceUsage.hasPublish());
-
assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.messages, 0);
- assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.bytes,
0);
- assertEquals(publishMonitoredEntity.totalUsedLocally.messages,
bytesAndMessagesCount.messages);
- assertEquals(publishMonitoredEntity.totalUsedLocally.bytes,
bytesAndMessagesCount.bytes);
- assertEquals(publishMonitoredEntity.lastReportedValues.messages,
bytesAndMessagesCount.messages);
- assertEquals(publishMonitoredEntity.lastReportedValues.bytes,
bytesAndMessagesCount.bytes);
+ for (ResourceGroupMonitoringClass value :
ResourceGroupMonitoringClass.values()) {
+ PerMonitoringClassFields monitoredEntity =
+ resourceGroup.getMonitoredEntity(value);
+ assertEquals(monitoredEntity.usedLocallySinceLastReport.messages,
0);
+ assertEquals(monitoredEntity.usedLocallySinceLastReport.bytes, 0);
+ assertEquals(monitoredEntity.totalUsedLocally.messages,
bytesAndMessagesCount.messages);
+ assertEquals(monitoredEntity.totalUsedLocally.bytes,
bytesAndMessagesCount.bytes);
+ assertEquals(monitoredEntity.lastReportedValues.messages,
bytesAndMessagesCount.messages);
+ assertEquals(monitoredEntity.lastReportedValues.bytes,
bytesAndMessagesCount.bytes);
+ }
}
}
\ No newline at end of file