This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 06f7142d32d [fix][broker] Run ResourceGroup tasks only when 
tenants/namespaces registered (#24859)
06f7142d32d is described below

commit 06f7142d32d66a31d05cbe37d57e8bcaf44dce07
Author: Vinkal <[email protected]>
AuthorDate: Fri Oct 31 17:01:25 2025 +0530

    [fix][broker] Run ResourceGroup tasks only when tenants/namespaces 
registered (#24859)
    
    Signed-off-by: Vinkal Chudgar <[email protected]>
---
 .../apache/pulsar/broker/ServiceConfiguration.java |   7 +-
 .../broker/resourcegroup/ResourceGroupService.java | 113 ++++++--
 .../resourcegroup/ResourceGroupServiceTest.java    | 302 ++++++++++++++++++++-
 3 files changed, 396 insertions(+), 26 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 30fef55ece3..e63c7d2b286 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1241,7 +1241,12 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     @FieldContext(
             dynamic = true,
             category = CATEGORY_POLICIES,
-            doc = "Default interval to publish usage reports if 
resourceUsagePublishToTopic is enabled."
+            doc = "Interval (in seconds) for ResourceGroupService periodic 
tasks while resource groups are actively "
+                    + "attached to tenants or namespaces. Periodic tasks start 
automatically when the first attachment "
+                    + "is registered and stop automatically when no 
attachments remain. "
+                    + "If a ResourceUsageTransportManager is configured (see 
resourceUsageTransportClassName), "
+                    + "this interval also controls how frequently, usage 
reports are published for cross-broker "
+                    + "coordination. Dynamic changes take effect at runtime 
and reschedule any running tasks."
     )
     private int resourceUsageTransportPublishIntervalInSecs = 60;
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
index 29633ab19fe..379f9f870ea 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
@@ -213,6 +213,9 @@ public class ResourceGroupService implements AutoCloseable{
         // Associate this tenant name with the RG.
         this.tenantToRGsMap.put(tenantName, rg);
         rgTenantRegisters.labels(resourceGroupName).inc();
+
+        // Ensure schedulers are started if this is the first registration.
+        maybeStartSchedulers();
     }
 
     /**
@@ -235,6 +238,9 @@ public class ResourceGroupService implements AutoCloseable{
         // Dissociate this tenant name from the RG.
         this.tenantToRGsMap.remove(tenantName, rg);
         rgTenantUnRegisters.labels(resourceGroupName).inc();
+
+        // If this was the last registration (tenant or namespace), stop 
schedulers.
+        maybeStopSchedulersIfIdle();
     }
 
     /**
@@ -266,6 +272,9 @@ public class ResourceGroupService implements AutoCloseable{
         // Associate this NS-name with the RG.
         this.namespaceToRGsMap.put(fqNamespaceName, rg);
         rgNamespaceRegisters.labels(resourceGroupName).inc();
+
+        // Ensure schedulers are started if this is the first registration.
+        maybeStartSchedulers();
     }
 
     /**
@@ -290,6 +299,9 @@ public class ResourceGroupService implements AutoCloseable{
         // Dissociate this NS-name from the RG.
         this.namespaceToRGsMap.remove(fqNamespaceName, rg);
         rgNamespaceUnRegisters.labels(resourceGroupName).inc();
+
+        // If this was the last registration (tenant or namespace), stop 
schedulers.
+        maybeStopSchedulersIfIdle();
     }
 
     /**
@@ -306,10 +318,16 @@ public class ResourceGroupService implements 
AutoCloseable{
     public void close() throws Exception {
         if (aggregateLocalUsagePeriodicTask != null) {
             aggregateLocalUsagePeriodicTask.cancel(true);
+            aggregateLocalUsagePeriodicTask = null;
         }
         if (calculateQuotaPeriodicTask != null) {
             calculateQuotaPeriodicTask.cancel(true);
+            calculateQuotaPeriodicTask = null;
         }
+
+        // Ensure the flag is consistent with the stopped state.
+        schedulersRunning.set(false);
+
         resourceGroupsMap.clear();
         tenantToRGsMap.clear();
         namespaceToRGsMap.clear();
@@ -540,6 +558,9 @@ public class ResourceGroupService implements AutoCloseable{
     // Periodically aggregate the usage from all topics known to the 
BrokerService.
     // Visibility for unit testing.
     protected void aggregateResourceGroupLocalUsages() {
+        if (!shouldRunPeriodicTasks()) {
+            return;
+        }
         final Summary.Timer aggrUsageTimer = 
rgUsageAggregationLatency.startTimer();
         BrokerService bs = this.pulsar.getBrokerService();
         Map<String, TopicStatsImpl> topicStatsMap = bs.getTopicStats();
@@ -578,7 +599,7 @@ public class ResourceGroupService implements AutoCloseable{
         // cancel and re-schedule this task if the period of execution has 
changed.
         ServiceConfiguration config = pulsar.getConfiguration();
         long newPeriodInSeconds = 
config.getResourceUsageTransportPublishIntervalInSecs();
-        if (newPeriodInSeconds != this.aggregateLocalUsagePeriodInSeconds) {
+        if (schedulersRunning.get() && newPeriodInSeconds != 
this.aggregateLocalUsagePeriodInSeconds) {
             if (this.aggregateLocalUsagePeriodicTask == null) {
                 log.error("aggregateResourceGroupLocalUsages: Unable to find 
running task to cancel when "
                                 + "publish period changed from {} to {} {}",
@@ -602,6 +623,9 @@ public class ResourceGroupService implements AutoCloseable{
     // from the reports received from other brokers.
     // [Visibility for unit testing.]
     protected void calculateQuotaForAllResourceGroups() {
+        if (!shouldRunPeriodicTasks()) {
+            return;
+        }
         // Calculate the quota for the next window for this RG, based on the 
observed usage.
         final Summary.Timer quotaCalcTimer = 
rgQuotaCalculationLatency.startTimer();
         BytesAndMessagesCount updatedQuota = new BytesAndMessagesCount();
@@ -668,7 +692,7 @@ public class ResourceGroupService implements AutoCloseable{
         // cancel and re-schedule this task if the period of execution has 
changed.
         ServiceConfiguration config = pulsar.getConfiguration();
         long newPeriodInSeconds = 
config.getResourceUsageTransportPublishIntervalInSecs();
-        if (newPeriodInSeconds != this.resourceUsagePublishPeriodInSeconds) {
+        if (schedulersRunning.get() && newPeriodInSeconds != 
this.resourceUsagePublishPeriodInSeconds) {
             if (this.calculateQuotaPeriodicTask == null) {
                 log.error("calculateQuotaForAllResourceGroups: Unable to find 
running task to cancel when "
                                 + "publish period changed from {} to {} {}",
@@ -690,23 +714,72 @@ public class ResourceGroupService implements 
AutoCloseable{
         }
     }
 
-    private void initialize() {
-        ServiceConfiguration config = this.pulsar.getConfiguration();
-        long periodInSecs = 
config.getResourceUsageTransportPublishIntervalInSecs();
-        this.aggregateLocalUsagePeriodInSeconds = 
this.resourceUsagePublishPeriodInSeconds = periodInSecs;
-        this.aggregateLocalUsagePeriodicTask = 
this.pulsar.getExecutor().scheduleAtFixedRate(
+    // Returns true if at least one tenant or namespace is registered to 
resource group.
+    private boolean hasActiveResourceGroups() {
+        return !tenantToRGsMap.isEmpty() || !namespaceToRGsMap.isEmpty();
+    }
+
+    /**
+     * Whether the periodic ResourceGroupService tasks (aggregation & quota 
calculation) should run.
+     * True only when:
+     *  1. the scheduler flag is set,
+     *  2. at least one Resource Group exists locally, and
+     *  3. at least one tenant or namespace is registered to Resource Group.
+     */
+    private boolean shouldRunPeriodicTasks() {
+        return schedulersRunning.get()
+                && !resourceGroupsMap.isEmpty()
+                && hasActiveResourceGroups();
+    }
+
+    // Start periodic aggregation/quota schedulers if we actually need them.
+    private void maybeStartSchedulers() {
+        if (!hasActiveResourceGroups()) {
+            return;
+        }
+        if (schedulersRunning.compareAndSet(false, true)) {
+            final long periodInSecs = 
pulsar.getConfiguration().getResourceUsageTransportPublishIntervalInSecs();
+            this.aggregateLocalUsagePeriodInSeconds = 
this.resourceUsagePublishPeriodInSeconds = periodInSecs;
+            this.aggregateLocalUsagePeriodicTask = 
pulsar.getExecutor().scheduleAtFixedRate(
                     
catchingAndLoggingThrowables(this::aggregateResourceGroupLocalUsages),
-                    periodInSecs,
-                    periodInSecs,
-                    this.timeUnitScale);
-        this.calculateQuotaPeriodicTask = 
this.pulsar.getExecutor().scheduleAtFixedRate(
+                    periodInSecs, periodInSecs, timeUnitScale);
+            this.calculateQuotaPeriodicTask = 
pulsar.getExecutor().scheduleAtFixedRate(
                     
catchingAndLoggingThrowables(this::calculateQuotaForAllResourceGroups),
-                    periodInSecs,
-                    periodInSecs,
-                    this.timeUnitScale);
-        maxIntervalForSuppressingReportsMSecs =
-                
TimeUnit.SECONDS.toMillis(this.resourceUsagePublishPeriodInSeconds) * 
MaxUsageReportSuppressRounds;
+                    periodInSecs, periodInSecs, timeUnitScale);
+            maxIntervalForSuppressingReportsMSecs =
+                    
TimeUnit.SECONDS.toMillis(this.resourceUsagePublishPeriodInSeconds) * 
MaxUsageReportSuppressRounds;
+            if (log.isInfoEnabled()) {
+                log.info("Started ResourceGroupService periodic tasks with 
period={} {}", periodInSecs, timeUnitScale);
+            }
+        }
+    }
+    // Stop schedulers when no tenant or namespace registrations remain.
+    private void maybeStopSchedulersIfIdle() {
+        if (hasActiveResourceGroups()) {
+            return;
+        }
+        if (schedulersRunning.compareAndSet(true, false)) {
+            if (aggregateLocalUsagePeriodicTask != null) {
+                aggregateLocalUsagePeriodicTask.cancel(true);
+                aggregateLocalUsagePeriodicTask = null;
+            }
+            if (calculateQuotaPeriodicTask != null) {
+                calculateQuotaPeriodicTask.cancel(true);
+                calculateQuotaPeriodicTask = null;
+            }
+            if (log.isInfoEnabled()) {
+                log.info("Stopped ResourceGroupService periodic tasks because 
no registrations remain");
+            }
+        }
+    }
 
+    private void initialize() {
+        // Store the configured interval. Do not start periodic tasks 
unconditionally here.
+        // Schedulers are started by maybeStartSchedulers() when the first 
tenant/namespace is registered.
+        final long periodInSecs = 
pulsar.getConfiguration().getResourceUsageTransportPublishIntervalInSecs();
+        this.aggregateLocalUsagePeriodInSeconds = 
this.resourceUsagePublishPeriodInSeconds = periodInSecs;
+        // if any tenant/namespace registrations already exist, 
maybeStartSchedulers() will start the schedulers now.
+        maybeStartSchedulers();
     }
 
     private void checkRGCreateParams(String rgName, 
org.apache.pulsar.common.policies.data.ResourceGroup rgConfig)
@@ -761,6 +834,9 @@ public class ResourceGroupService implements AutoCloseable{
     // Allow a pluggable scale on time units; for testing periodic 
functionality.
     private TimeUnit timeUnitScale;
 
+    private final java.util.concurrent.atomic.AtomicBoolean schedulersRunning =
+            new java.util.concurrent.atomic.AtomicBoolean(false);
+
     // The maximum number of successive rounds that we can suppress reporting 
local usage, because there was no
     // substantial change from the prior round. This is to ensure the 
reporting does not become too chatty.
     // Set this value to one more than the cadence of sending reports; e.g., 
if you want to send every 3rd report,
@@ -864,4 +940,9 @@ public class ResourceGroupService implements AutoCloseable{
     ScheduledFuture<?> getCalculateQuotaPeriodicTask() {
         return this.calculateQuotaPeriodicTask;
     }
+
+    @VisibleForTesting
+    boolean isSchedulersRunning() {
+        return schedulersRunning.get();
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java
index 63b483cef5e..534492894cf 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java
@@ -228,9 +228,6 @@ public class ResourceGroupServiceTest extends 
MockedPulsarServiceBaseTest {
             Assert.assertTrue(myBoolSet.contains(false));
         }
 
-        rgs.unRegisterTenant(rgName, tenantName);
-        rgs.unRegisterNameSpace(rgName, tenantAndNamespace);
-
         BytesAndMessagesCount publishQuota = 
rgs.getPublishRateLimiters(rgName);
 
         // Calculated quota is synthetically set to the number of 
quota-calculation callbacks.
@@ -241,7 +238,7 @@ public class ResourceGroupServiceTest extends 
MockedPulsarServiceBaseTest {
             Assert.assertEquals(publishQuota.bytes, 
rgConfig.getPublishRateInBytes().longValue());
         }
 
-        // Calculate the quota synchronously to avoid waiting for a periodic 
call within ResourceGroupService.
+        // Calculate the quota synchronously while an attachment still exists
         rgs.calculateQuotaForAllResourceGroups();
         publishQuota = rgs.getPublishRateLimiters(rgName);
         // The bytes/messages are (synthetically) set from 
numAnonymousQuotaCalculations in the above round of
@@ -250,18 +247,57 @@ public class ResourceGroupServiceTest extends 
MockedPulsarServiceBaseTest {
         Assert.assertTrue(publishQuota.messages > 0 && publishQuota.messages 
<= numAnonymousQuotaCalculations);
         Assert.assertTrue(publishQuota.bytes > 0 &&  publishQuota.bytes <= 
numAnonymousQuotaCalculations);
 
+        // Now it is safe to detach. After this point the service is 
intentionally idle.
+        rgs.unRegisterTenant(rgName, tenantName);
+        rgs.unRegisterNameSpace(rgName, tenantAndNamespace);
+
         rgs.resourceGroupDelete(rgName);
         Assert.assertThrows(PulsarAdminException.class, () -> 
rgs.getPublishRateLimiters(rgName));
 
         Assert.assertEquals(rgs.getNumResourceGroups(), 0);
     }
 
-    @Test
+    /**
+     * Validates that ResourceGroupService#close() cancels scheduled tasks, 
clears futures and state.
+     * Steps:
+     * 1) Start periodic tasks by creating a resource group and attaching a 
namespace.
+     * 2) Assert both futures are non-null (tasks are scheduled) and the 
schedulersRunning flag is true.
+     * 3) Let try-with-resources close the service, then assert both futures 
are null, schedulersRunning is false,
+     *    and the resource group map is cleared.
+     */
+    @Test(timeOut = 60000)
     public void testClose() throws Exception {
-        ResourceGroupService service = new ResourceGroupService(pulsar, 
TimeUnit.MILLISECONDS, null, null);
-        service.close();
-        
Assert.assertTrue(service.getAggregateLocalUsagePeriodicTask().isCancelled());
-        
Assert.assertTrue(service.getCalculateQuotaPeriodicTask().isCancelled());
+        final String rg = "rg-close";
+        final NamespaceName ns = NamespaceName.get("t-close/ns-close");
+
+        // Create the service once so we can assert state after the 
try-with-resources closes it.
+        ResourceGroupService service = createResourceGroupService();
+        try (ResourceGroupService ignored = service) {
+            // Start the periodic tasks: create RG and attach a namespace.
+            service.resourceGroupCreate(rg, new 
org.apache.pulsar.common.policies.data.ResourceGroup());
+            service.registerNameSpace(rg, ns);
+
+            // Ensure tasks are started
+            Assert.assertNotNull(service.getAggregateLocalUsagePeriodicTask(),
+                    "Aggregate task should be scheduled.");
+            Assert.assertNotNull(service.getCalculateQuotaPeriodicTask(),
+                    "Quota task should be scheduled.");
+            Assert.assertTrue(service.isSchedulersRunning(),
+                    "SchedulersRunning flag should be true when tasks are 
active.");
+
+            // Do not call service.close() here. The try-with-resources will 
close it and ensure cleanup.
+        }
+
+        // After the try-with-resources block, service.close() has been 
invoked automatically.
+        // Postconditions: futures cleared to null, internal state cleared, 
flag off.
+        Assert.assertNull(service.getAggregateLocalUsagePeriodicTask(),
+                "Aggregate task future must be null after close().");
+        Assert.assertNull(service.getCalculateQuotaPeriodicTask(),
+                "Quota task future must be null after close().");
+        Assert.assertEquals(service.getNumResourceGroups(), 0,
+                "Resource group map should be cleared by close().");
+        Assert.assertFalse(service.isSchedulersRunning(),
+                "SchedulersRunning flag should be false after close().");
     }
 
     private ResourceGroupService rgs;
@@ -274,4 +310,252 @@ public class ResourceGroupServiceTest extends 
MockedPulsarServiceBaseTest {
         
this.conf.setResourceUsageTransportPublishIntervalInSecs(PUBLISH_INTERVAL_SECS);
         admin.clusters().createCluster("test", 
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
     }
+    /**
+     * Helper method to create a fresh ResourceGroupService instance for 
testing.
+     * Each test should create its own instance to ensure isolation.
+     */
+    private ResourceGroupService createResourceGroupService() {
+        return new ResourceGroupService(pulsar);
+    }
+
+    /**
+     * Validates the lazy scheduling lifecycle and deterministic rescheduling 
of ResourceGroupService.
+     * Asserts that:
+     * 1) On cold start, and after creating a Resource Group (RG) without any 
attachments,
+     * no periodic tasks are scheduled.
+     * 2) Registering the first attachment (tenant or namespace) starts both 
periodic tasks.
+     * 3) Updating the publish interval causes rescheduling
+     *    - calling aggregateResourceGroupLocalUsages() reschedules only the 
aggregation task;
+     *    - calling calculateQuotaForAllResourceGroups() reschedules only the 
quota-calculation task.
+     * 4) When the last attachment is unregistered (i.e., no tenants or 
namespaces remain attached to any RG),
+     *    both periodic tasks are cancelled and their ScheduledFuture fields 
are cleared.
+     */
+
+    @Test(timeOut = 60000)
+    public void testLazyStartStopAndReschedule() throws Exception {
+        final String rgName = "rg-lazy";
+        final NamespaceName ns = NamespaceName.get("t-lazy/ns-lazy");
+        final int oldInterval = 
this.conf.getResourceUsageTransportPublishIntervalInSecs();
+
+        try (ResourceGroupService rgs = createResourceGroupService()) {
+            // Cold start: nothing scheduled
+            Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(),
+                    "Aggregate task should be null on cold start.");
+            Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(),
+                    "Quota task should be null on cold start.");
+
+            // Create a resource group but do not attach yet. There should 
still be nothing scheduled.
+            rgs.resourceGroupCreate(rgName, new 
org.apache.pulsar.common.policies.data.ResourceGroup());
+            Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(),
+                    "Aggregate task should remain null without attachments.");
+            Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(),
+                    "Quota task should remain null without attachments.");
+
+            // Attach a namespace. Both schedulers must start.
+            rgs.registerNameSpace(rgName, ns);
+            Assert.assertNotNull(rgs.getAggregateLocalUsagePeriodicTask(),
+                    "Aggregate task should start on first attachment.");
+            Assert.assertNotNull(rgs.getCalculateQuotaPeriodicTask(),
+                    "Quota task should start on first attachment.");
+            
Assert.assertFalse(rgs.getAggregateLocalUsagePeriodicTask().isCancelled(),
+                    "Aggregate task should be running.");
+            
Assert.assertFalse(rgs.getCalculateQuotaPeriodicTask().isCancelled(),
+                    "Quota task should be running.");
+
+            // Capture current scheduled futures
+            java.util.concurrent.ScheduledFuture<?> oldAgg = 
rgs.getAggregateLocalUsagePeriodicTask();
+            java.util.concurrent.ScheduledFuture<?> oldCalc = 
rgs.getCalculateQuotaPeriodicTask();
+
+            // Change publish interval dynamically
+            int newPeriod = oldInterval + 1;
+            
this.conf.setResourceUsageTransportPublishIntervalInSecs(newPeriod);
+
+            // Trigger aggregate reschedule
+            rgs.aggregateResourceGroupLocalUsages();
+            java.util.concurrent.ScheduledFuture<?> midAgg = 
rgs.getAggregateLocalUsagePeriodicTask();
+            java.util.concurrent.ScheduledFuture<?> midCalc = 
rgs.getCalculateQuotaPeriodicTask();
+
+            Assert.assertNotSame(midAgg, oldAgg, "Aggregate task should be 
rescheduled with a new future.");
+            Assert.assertTrue(oldAgg.isCancelled(), "Old aggregate task should 
be cancelled on reschedule.");
+            Assert.assertSame(midCalc, oldCalc, "Quota task should not be 
rescheduled by aggregate path.");
+            Assert.assertFalse(oldCalc.isCancelled(),
+                    "Old quota task should still be active before its own 
reschedule.");
+            Assert.assertFalse(midAgg.isCancelled(), "New aggregate task 
should be active.");
+
+            // Now trigger calculate reschedule
+            rgs.calculateQuotaForAllResourceGroups();
+            java.util.concurrent.ScheduledFuture<?> newAgg = 
rgs.getAggregateLocalUsagePeriodicTask();
+            java.util.concurrent.ScheduledFuture<?> newCalc = 
rgs.getCalculateQuotaPeriodicTask();
+
+            Assert.assertSame(newAgg, midAgg,
+                    "Aggregate task was already rescheduled. Future should 
remain the same.");
+            Assert.assertNotSame(newCalc, oldCalc, "Quota task should be 
rescheduled with a new future.");
+            Assert.assertTrue(oldCalc.isCancelled(), "Old quota task should be 
cancelled on its reschedule.");
+            Assert.assertFalse(newCalc.isCancelled(), "New quota task should 
be active.");
+
+            // Detach the last attachment. Schedulers must stop and futures 
should be cleared.
+            rgs.unRegisterNameSpace(rgName, ns);
+            Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(),
+                    "Aggregate task should be cleared after last detach.");
+            Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(),
+                    "Quota task should be cleared after last detach.");
+
+            // Cleanup resource group
+            rgs.resourceGroupDelete(rgName);
+            Assert.assertEquals(rgs.getNumResourceGroups(), 0, "Resource group 
should be deleted.");
+        } finally {
+            
this.conf.setResourceUsageTransportPublishIntervalInSecs(oldInterval);
+        }
+    }
+
+    /**
+     * Verifies that creating a ResourceGroup without tenant or namespace 
attachments
+     * does NOT trigger scheduler initialization. This ensures the lazy-start 
optimization
+     * correctly avoids unnecessary overhead when ResourceGroups are 
configured but unused.
+     */
+    @Test(timeOut = 60000)
+    public void testNoStartOnRGCreateOnly() throws Exception {
+        final String rg = "rg-create-only";
+        final int oldInterval = 
this.conf.getResourceUsageTransportPublishIntervalInSecs();
+
+        try (ResourceGroupService rgs = createResourceGroupService()) {
+            // No tasks at cold start
+            Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(),
+                    "Aggregate task should be null at cold start.");
+            Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(), "Quota task 
should be null at cold start.");
+
+            // Create resource group without attachments. There should still 
be nothing scheduled.
+            rgs.resourceGroupCreate(rg, new 
org.apache.pulsar.common.policies.data.ResourceGroup());
+            Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(),
+                    "Aggregate task should remain null without attachments.");
+            Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(),
+                    "Quota task should remain null without attachments.");
+
+            // Calling periodic methods directly must not create schedulers
+            rgs.aggregateResourceGroupLocalUsages();
+            rgs.calculateQuotaForAllResourceGroups();
+            Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(),
+                    "Aggregate task should remain null after direct calls.");
+            Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(),
+                    "Quota task should remain null after direct calls.");
+
+            // Dynamic config while stopped must not trigger scheduling
+            
this.conf.setResourceUsageTransportPublishIntervalInSecs(oldInterval + 7);
+            rgs.aggregateResourceGroupLocalUsages();
+            rgs.calculateQuotaForAllResourceGroups();
+            Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(),
+                    "Aggregate task should remain null after config change 
while stopped.");
+            Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(),
+                    "Quota task should remain null after config change while 
stopped.");
+
+            // Cleanup resource group
+            rgs.resourceGroupDelete(rg);
+            Assert.assertEquals(rgs.getNumResourceGroups(), 0, "Resource group 
should be deleted.");
+        } finally {
+            
this.conf.setResourceUsageTransportPublishIntervalInSecs(oldInterval);
+        }
+    }
+
+    /**
+     * Validates that attaching a tenant (without namespace attachment) to a 
ResourceGroup
+     * triggers scheduler initialization, and that detaching the last tenant 
stops the schedulers.
+     */
+    @Test(timeOut = 60000)
+    public void testStartOnTenantAttachment() throws Exception {
+        final String rg = "rg-tenant-only";
+        final String tenant = "t-attach";
+
+        try (ResourceGroupService rgs = createResourceGroupService()) {
+            rgs.resourceGroupCreate(rg, new 
org.apache.pulsar.common.policies.data.ResourceGroup());
+            rgs.registerTenant(rg, tenant);
+
+            Assert.assertNotNull(rgs.getAggregateLocalUsagePeriodicTask(),
+                    "Aggregate task should start on first tenant attachment.");
+            Assert.assertNotNull(rgs.getCalculateQuotaPeriodicTask(),
+                    "Quota task should start on first tenant attachment.");
+            
Assert.assertFalse(rgs.getAggregateLocalUsagePeriodicTask().isCancelled(),
+                    "Aggregate task should be running.");
+            
Assert.assertFalse(rgs.getCalculateQuotaPeriodicTask().isCancelled(),
+                    "Quota task should be running.");
+
+            // Detach and ensure schedulers stop
+            rgs.unRegisterTenant(rg, tenant);
+            Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(),
+                    "Aggregate task should be cleared after last detach.");
+            Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(),
+                    "Quota task should be cleared after last detach.");
+
+            rgs.resourceGroupDelete(rg);
+            Assert.assertEquals(rgs.getNumResourceGroups(), 0, "Resource group 
should be deleted.");
+        }
+    }
+
+    /**
+     * Tests scheduler lifecycle when a ResourceGroup has both tenant and 
namespace attachments.
+     * Verifies that schedulers remain active as long as ANY attachment 
exists, and only stop
+     * when the last attachment (tenant or namespace) is removed.
+     */
+    @Test(timeOut = 60000)
+    public void testStopOnLastDetachWithMixedRefs() throws Exception {
+        final String rg = "rg-mixed";
+        final String tenant = "t-mixed";
+        final NamespaceName ns = NamespaceName.get("t-mixed/ns1");
+
+        try (ResourceGroupService rgs = createResourceGroupService()) {
+            rgs.resourceGroupCreate(rg, new 
org.apache.pulsar.common.policies.data.ResourceGroup());
+
+            // Attach both a tenant and a namespace
+            rgs.registerTenant(rg, tenant);
+            rgs.registerNameSpace(rg, ns);
+
+            Assert.assertNotNull(rgs.getAggregateLocalUsagePeriodicTask(), 
"Aggregate task should be started.");
+            Assert.assertNotNull(rgs.getCalculateQuotaPeriodicTask(), "Quota 
task should be started.");
+
+            // Remove one reference. Tasks should still be present.
+            rgs.unRegisterTenant(rg, tenant);
+            Assert.assertNotNull(rgs.getAggregateLocalUsagePeriodicTask(),
+                    "Aggregate task should remain with remaining namespace 
attachment.");
+            Assert.assertNotNull(rgs.getCalculateQuotaPeriodicTask(),
+                    "Quota task should remain with remaining namespace 
attachment.");
+
+            // Remove last reference. Tasks should stop.
+            rgs.unRegisterNameSpace(rg, ns);
+            Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(),
+                    "Aggregate task should be cleared after last detach.");
+            Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(),
+                    "Quota task should be cleared after last detach.");
+
+            rgs.resourceGroupDelete(rg);
+            Assert.assertEquals(rgs.getNumResourceGroups(), 0, "Resource group 
should be deleted.");
+        }
+    }
+
+    /**
+     * Ensures that dynamic configuration changes to the publish interval do 
NOT cause
+     * scheduler initialization when no attachments exist. This validates the 
guard logic
+     * that prevents spurious rescheduling attempts on stopped schedulers.
+     */
+    @Test(timeOut = 60000)
+    public void testNoRescheduleWhenStopped() throws Exception {
+        final int oldInterval = 
this.conf.getResourceUsageTransportPublishIntervalInSecs();
+
+        try (ResourceGroupService rgs = createResourceGroupService()) {
+            // Ensure stopped state
+            Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(), 
"Aggregate task should be null.");
+            Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(), "Quota task 
should be null.");
+
+            // Change interval while stopped
+            
this.conf.setResourceUsageTransportPublishIntervalInSecs(oldInterval + 13);
+
+            // Call both periodic methods directly. Futures must remain null.
+            rgs.aggregateResourceGroupLocalUsages();
+            rgs.calculateQuotaForAllResourceGroups();
+            Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(), 
"Aggregate task should remain null.");
+            Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(), "Quota task 
should remain null.");
+        } finally {
+            
this.conf.setResourceUsageTransportPublishIntervalInSecs(oldInterval);
+        }
+    }
+
+
 }
\ No newline at end of file

Reply via email to