This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 8339e846528 [cherry-pick][branch-2.9] Fix NPE when
ResourceGroupService execute scheduled task. (#18389)
8339e846528 is described below
commit 8339e846528c28eea6b716ae05552f2123854079
Author: congbo <[email protected]>
AuthorDate: Tue Nov 8 21:17:09 2022 +0800
[cherry-pick][branch-2.9] Fix NPE when ResourceGroupService execute
scheduled task. (#18389)
---
.../org/apache/pulsar/broker/PulsarService.java | 8 +++++
.../broker/resourcegroup/ResourceGroupService.java | 38 ++++++++++++++++++----
.../resourcegroup/ResourceGroupServiceTest.java | 8 +++++
3 files changed, 48 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index c10e6acc2a8..acbd1ea33e4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -384,6 +384,14 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
this.resourceUsageTransportManager.close();
this.resourceUsageTransportManager = null;
}
+ if (this.resourceGroupServiceManager != null) {
+ try {
+ this.resourceGroupServiceManager.close();
+ } catch (Exception e) {
+ LOG.warn("ResourceGroupServiceManager closing failed {}",
e.getMessage());
+ }
+ this.resourceGroupServiceManager = null;
+ }
if (this.webService != null) {
try {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
index 4900027d218..53b5a7b96c8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.resourcegroup;
import static
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import com.google.common.annotations.VisibleForTesting;
import io.prometheus.client.Counter;
import io.prometheus.client.Summary;
import java.util.Map;
@@ -52,7 +53,7 @@ import org.slf4j.LoggerFactory;
*
* @see PulsarService
*/
-public class ResourceGroupService {
+public class ResourceGroupService implements AutoCloseable{
/**
* Default constructor.
*/
@@ -302,6 +303,21 @@ public class ResourceGroupService {
return this.namespaceToRGsMap.get(namespaceName);
}
+ @Override
+ public void close() throws Exception {
+ if (aggregateLocalUsagePeriodicTask != null) {
+ aggregateLocalUsagePeriodicTask.cancel(true);
+ }
+ if (calculateQuotaPeriodicTask != null) {
+ calculateQuotaPeriodicTask.cancel(true);
+ }
+ resourceGroupsMap.clear();
+ tenantToRGsMap.clear();
+ namespaceToRGsMap.clear();
+ topicProduceStats.clear();
+ topicConsumeStats.clear();
+ }
+
/**
* Increments usage stats for the resource groups associated with the
given namespace and tenant.
* Expected to be called when a message is produced or consumed on a
topic, or when we calculate
@@ -565,17 +581,17 @@ public class ResourceGroupService {
ServiceConfiguration config = pulsar.getConfiguration();
long newPeriodInSeconds =
config.getResourceUsageTransportPublishIntervalInSecs();
if (newPeriodInSeconds != this.aggregateLocalUsagePeriodInSeconds) {
- if (this.aggreagteLocalUsagePeriodicTask == null) {
+ if (this.aggregateLocalUsagePeriodicTask == null) {
log.error("aggregateResourceGroupLocalUsages: Unable to find
running task to cancel when "
+ "publish period changed from {} to {} {}",
this.aggregateLocalUsagePeriodInSeconds,
newPeriodInSeconds, timeUnitScale);
} else {
- boolean cancelStatus =
this.aggreagteLocalUsagePeriodicTask.cancel(true);
+ boolean cancelStatus =
this.aggregateLocalUsagePeriodicTask.cancel(true);
log.info("aggregateResourceGroupLocalUsages: Got status={} in
cancel of periodic "
+ "when publish period changed from {} to {}
{}",
cancelStatus, this.aggregateLocalUsagePeriodInSeconds,
newPeriodInSeconds, timeUnitScale);
}
- this.aggreagteLocalUsagePeriodicTask =
pulsar.getExecutor().scheduleAtFixedRate(
+ this.aggregateLocalUsagePeriodicTask =
pulsar.getExecutor().scheduleAtFixedRate(
catchingAndLoggingThrowables(this::aggregateResourceGroupLocalUsages),
newPeriodInSeconds,
newPeriodInSeconds,
@@ -680,7 +696,7 @@ public class ResourceGroupService {
ServiceConfiguration config = this.pulsar.getConfiguration();
long periodInSecs =
config.getResourceUsageTransportPublishIntervalInSecs();
this.aggregateLocalUsagePeriodInSeconds =
this.resourceUsagePublishPeriodInSeconds = periodInSecs;
- this.aggreagteLocalUsagePeriodicTask =
this.pulsar.getExecutor().scheduleAtFixedRate(
+ this.aggregateLocalUsagePeriodicTask =
this.pulsar.getExecutor().scheduleAtFixedRate(
catchingAndLoggingThrowables(this::aggregateResourceGroupLocalUsages),
periodInSecs,
periodInSecs,
@@ -737,7 +753,7 @@ public class ResourceGroupService {
// The task that periodically re-calculates the quota budget for local
usage.
- private ScheduledFuture<?> aggreagteLocalUsagePeriodicTask;
+ private ScheduledFuture<?> aggregateLocalUsagePeriodicTask;
private long aggregateLocalUsagePeriodInSeconds;
// The task that periodically re-calculates the quota budget for local
usage.
@@ -830,4 +846,14 @@ public class ResourceGroupService {
.name("pulsar_resource_group_calculate_quota_secs")
.help("Time required to calculate quota of all resource groups, in
seconds.")
.register();
+
+ @VisibleForTesting
+ ScheduledFuture<?> getAggregateLocalUsagePeriodicTask() {
+ return this.aggregateLocalUsagePeriodicTask;
+ }
+
+ @VisibleForTesting
+ ScheduledFuture<?> getCalculateQuotaPeriodicTask() {
+ return this.calculateQuotaPeriodicTask;
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java
index c1d781f3920..ae0d6ee9381 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java
@@ -260,6 +260,14 @@ public class ResourceGroupServiceTest extends
MockedPulsarServiceBaseTest {
Assert.assertEquals(rgs.getNumResourceGroups(), 0);
}
+ @Test
+ public void testClose() throws Exception {
+ ResourceGroupService service = new ResourceGroupService(pulsar,
TimeUnit.MILLISECONDS, null, null);
+ service.close();
+
Assert.assertTrue(service.getAggregateLocalUsagePeriodicTask().isCancelled());
+
Assert.assertTrue(service.getCalculateQuotaPeriodicTask().isCancelled());
+ }
+
private ResourceGroupService rgs;
int numAnonymousQuotaCalculations;