This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 42ccecf72af Fix NPE when ResourceGroupService execute scheduled task. 
(#17840)
42ccecf72af is described below

commit 42ccecf72af2d134cd118694b88eeae7e7f4357b
Author: Jiwei Guo <[email protected]>
AuthorDate: Wed Sep 28 21:55:26 2022 +0800

    Fix NPE when ResourceGroupService execute scheduled task. (#17840)
    
    (cherry picked from commit 62d900f6792b6540bf0b992cd6e868b8bea6231c)
---
 .../org/apache/pulsar/broker/PulsarService.java    |  8 ++++
 .../broker/resourcegroup/ResourceGroupService.java | 48 +++++++++++++++++++---
 .../resourcegroup/ResourceGroupServiceTest.java    |  8 ++++
 3 files changed, 58 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 262ada116a6..73f49670fc4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -408,6 +408,14 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 }
                 this.resourceUsageTransportManager = null;
             }
+            if (this.resourceGroupServiceManager != null) {
+                try {
+                    this.resourceGroupServiceManager.close();
+                } catch (Exception e) {
+                    LOG.warn("ResourceGroupServiceManager closing failed {}", 
e.getMessage());
+                }
+                this.resourceGroupServiceManager = null;
+            }
 
             if (this.webService != null) {
                 try {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
index 79cba28d374..c74681fdb73 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.resourcegroup;
 
 import static 
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import com.google.common.annotations.VisibleForTesting;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Summary;
 import java.util.Map;
@@ -52,7 +53,7 @@ import org.slf4j.LoggerFactory;
  *
  * @see PulsarService
  */
-public class ResourceGroupService {
+public class ResourceGroupService implements AutoCloseable{
     /**
      * Default constructor.
      */
@@ -302,6 +303,21 @@ public class ResourceGroupService {
         return this.namespaceToRGsMap.get(namespaceName);
     }
 
+    @Override
+    public void close() throws Exception {
+        if (aggregateLocalUsagePeriodicTask != null) {
+            aggregateLocalUsagePeriodicTask.cancel(true);
+        }
+        if (calculateQuotaPeriodicTask != null) {
+            calculateQuotaPeriodicTask.cancel(true);
+        }
+        resourceGroupsMap.clear();
+        tenantToRGsMap.clear();
+        namespaceToRGsMap.clear();
+        topicProduceStats.clear();
+        topicConsumeStats.clear();
+    }
+
     /**
      * Increments usage stats for the resource groups associated with the 
given namespace and tenant.
      * Expected to be called when a message is produced or consumed on a 
topic, or when we calculate
@@ -564,17 +580,17 @@ public class ResourceGroupService {
         ServiceConfiguration config = pulsar.getConfiguration();
         long newPeriodInSeconds = 
config.getResourceUsageTransportPublishIntervalInSecs();
         if (newPeriodInSeconds != this.aggregateLocalUsagePeriodInSeconds) {
-            if (this.aggreagteLocalUsagePeriodicTask == null) {
+            if (this.aggregateLocalUsagePeriodicTask == null) {
                 log.error("aggregateResourceGroupLocalUsages: Unable to find 
running task to cancel when "
                                 + "publish period changed from {} to {} {}",
                         this.aggregateLocalUsagePeriodInSeconds, 
newPeriodInSeconds, timeUnitScale);
             } else {
-                boolean cancelStatus = 
this.aggreagteLocalUsagePeriodicTask.cancel(true);
+                boolean cancelStatus = 
this.aggregateLocalUsagePeriodicTask.cancel(true);
                 log.info("aggregateResourceGroupLocalUsages: Got status={} in 
cancel of periodic "
                                 + "when publish period changed from {} to {} 
{}",
                         cancelStatus, this.aggregateLocalUsagePeriodInSeconds, 
newPeriodInSeconds, timeUnitScale);
             }
-            this.aggreagteLocalUsagePeriodicTask = 
pulsar.getExecutor().scheduleAtFixedRate(
+            this.aggregateLocalUsagePeriodicTask = 
pulsar.getExecutor().scheduleAtFixedRate(
                     
catchingAndLoggingThrowables(this::aggregateResourceGroupLocalUsages),
                     newPeriodInSeconds,
                     newPeriodInSeconds,
@@ -679,7 +695,7 @@ public class ResourceGroupService {
         ServiceConfiguration config = this.pulsar.getConfiguration();
         long periodInSecs = 
config.getResourceUsageTransportPublishIntervalInSecs();
         this.aggregateLocalUsagePeriodInSeconds = 
this.resourceUsagePublishPeriodInSeconds = periodInSecs;
-        this.aggreagteLocalUsagePeriodicTask = 
this.pulsar.getExecutor().scheduleAtFixedRate(
+        this.aggregateLocalUsagePeriodicTask = 
this.pulsar.getExecutor().scheduleAtFixedRate(
                     
catchingAndLoggingThrowables(this::aggregateResourceGroupLocalUsages),
                     periodInSecs,
                     periodInSecs,
@@ -736,7 +752,7 @@ public class ResourceGroupService {
 
 
     // The task that periodically re-calculates the quota budget for local 
usage.
-    private ScheduledFuture<?> aggreagteLocalUsagePeriodicTask;
+    private ScheduledFuture<?> aggregateLocalUsagePeriodicTask;
     private long aggregateLocalUsagePeriodInSeconds;
 
     // The task that periodically re-calculates the quota budget for local 
usage.
@@ -829,4 +845,24 @@ public class ResourceGroupService {
             .name("pulsar_resource_group_calculate_quota_secs")
             .help("Time required to calculate quota of all resource groups, in 
seconds.")
             .register();
+
+    @VisibleForTesting
+    ConcurrentHashMap getTopicConsumeStats() {
+        return this.topicConsumeStats;
+    }
+
+    @VisibleForTesting
+    ConcurrentHashMap getTopicProduceStats() {
+        return this.topicProduceStats;
+    }
+
+    @VisibleForTesting
+    ScheduledFuture<?> getAggregateLocalUsagePeriodicTask() {
+        return this.aggregateLocalUsagePeriodicTask;
+    }
+
+    @VisibleForTesting
+    ScheduledFuture<?> getCalculateQuotaPeriodicTask() {
+        return this.calculateQuotaPeriodicTask;
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java
index e0e3ec9c16a..86dff398f97 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java
@@ -257,6 +257,14 @@ public class ResourceGroupServiceTest extends 
MockedPulsarServiceBaseTest {
         Assert.assertEquals(rgs.getNumResourceGroups(), 0);
     }
 
+    @Test
+    public void testClose() throws Exception {
+        ResourceGroupService service = new ResourceGroupService(pulsar, 
TimeUnit.MILLISECONDS, null, null);
+        service.close();
+        
Assert.assertTrue(service.getAggregateLocalUsagePeriodicTask().isCancelled());
+        
Assert.assertTrue(service.getCalculateQuotaPeriodicTask().isCancelled());
+    }
+
     private ResourceGroupService rgs;
     int numAnonymousQuotaCalculations;
 

Reply via email to